This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit edc940edbc42e515bb407694992f39c554c74145
Author: leah <[email protected]>
AuthorDate: Fri Nov 13 12:44:09 2020 -0600

    KAFKA-10705: Make state stores not readable by others (#9583)
    
    Change permissions on the folders for the state store so they're no 
readable or writable by "others", but still accessible by owner and group 
members.
    
    Reviewers: Bruno Cadonna <[email protected]>,  Walker Carlson 
<[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/internals/StateDirectory.java        | 14 +++++++++++
 .../processor/internals/StateDirectoryTest.java    | 27 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 861a971..2f04888 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -31,10 +31,15 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.nio.file.Path;
+import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
@@ -102,6 +107,15 @@ public class StateDirectory {
             log.warn("Using /tmp directory in the state.dir property can cause 
failures with writing the checkpoint file" +
                 " due to the fact that this directory can be cleared by the 
OS");
         }
+        final Path basePath = Paths.get(baseDir.getPath());
+        final Path statePath = Paths.get(stateDir.getPath());
+        final Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxr-x---");
+        try {
+            Files.setPosixFilePermissions(basePath, perms);
+            Files.setPosixFilePermissions(statePath, perms);
+        } catch (final IOException e) {
+            log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+        }
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 5c7e678..93e81c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -34,10 +34,14 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
@@ -62,6 +66,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class StateDirectoryTest {
 
@@ -107,6 +113,27 @@ public class StateDirectoryTest {
     }
 
     @Test
+    public void shouldHaveSecurePermissions() {
+        final Set<PosixFilePermission> expectedPermissions = EnumSet.of(
+            PosixFilePermission.OWNER_EXECUTE,
+            PosixFilePermission.GROUP_READ,
+            PosixFilePermission.OWNER_WRITE,
+            PosixFilePermission.GROUP_EXECUTE,
+            PosixFilePermission.OWNER_READ);
+
+        final Path statePath = Paths.get(stateDir.getPath());
+        final Path basePath = Paths.get(appDir.getPath());
+        try {
+            final Set<PosixFilePermission> baseFilePermissions = 
Files.getPosixFilePermissions(statePath);
+            final Set<PosixFilePermission> appFilePermissions = 
Files.getPosixFilePermissions(basePath);
+            assertThat(expectedPermissions.equals(baseFilePermissions), 
is(true));
+            assertThat(expectedPermissions.equals(appFilePermissions), 
is(true));
+        } catch (final IOException e) {
+            fail("Should create correct files and set correct permissions");
+        }
+    }
+
+    @Test
     public void shouldCreateTaskStateDirectory() {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);

Reply via email to