This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 650d93b  KAFKA-12211: don't change perm for base/state dir when no 
persistent store (#9904)
650d93b is described below

commit 650d93b2d8a90afda0c367b430ec1c8c49d09743
Author: Luke Chen <[email protected]>
AuthorDate: Thu Jan 21 03:37:56 2021 +0800

    KAFKA-12211: don't change perm for base/state dir when no persistent store 
(#9904)
    
     If a user doesn't have Persistent Stores, we won't create base dir and 
state dir and should not try to set permissions on them.
    
    Reviewers: Bruno Cadonna <[email protected]>, Guozhang Wang 
<[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/internals/StateDirectory.java        | 45 ++++++++++++----------
 .../processor/internals/StateDirectoryTest.java    | 15 +++++---
 2 files changed, 35 insertions(+), 25 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 2f04888..b98c3cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -94,27 +94,32 @@ public class StateDirectory {
         this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.hasPersistentStores && !baseDir.exists() && 
!baseDir.mkdirs()) {
-            throw new ProcessorStateException(
-                String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
-        }
         stateDir = new File(baseDir, appId);
-        if (this.hasPersistentStores && !stateDir.exists() && 
!stateDir.mkdir()) {
-            throw new ProcessorStateException(
-                String.format("state directory [%s] doesn't exist and couldn't 
be created", stateDir.getPath()));
-        }
-        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
-            log.warn("Using /tmp directory in the state.dir property can cause 
failures with writing the checkpoint file" +
-                " due to the fact that this directory can be cleared by the 
OS");
-        }
-        final Path basePath = Paths.get(baseDir.getPath());
-        final Path statePath = Paths.get(stateDir.getPath());
-        final Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxr-x---");
-        try {
-            Files.setPosixFilePermissions(basePath, perms);
-            Files.setPosixFilePermissions(statePath, perms);
-        } catch (final IOException e) {
-            log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+
+        if (this.hasPersistentStores) {
+            if (!baseDir.exists() && !baseDir.mkdirs()) {
+                throw new ProcessorStateException(
+                    String.format("base state directory [%s] doesn't exist and 
couldn't be created", stateDirName));
+            }
+            if (!stateDir.exists() && !stateDir.mkdir()) {
+                throw new ProcessorStateException(
+                    String.format("state directory [%s] doesn't exist and 
couldn't be created", stateDir.getPath()));
+            }
+            if (stateDirName.startsWith("/tmp")) {
+                log.warn("Using /tmp directory in the state.dir property can 
cause failures with writing the checkpoint file" +
+                    " due to the fact that this directory can be cleared by 
the OS");
+            }
+
+            // change the dir permission to "rwxr-x---" to avoid world readable
+            final Path basePath = Paths.get(baseDir.getPath());
+            final Path statePath = Paths.get(stateDir.getPath());
+            final Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxr-x---");
+            try {
+                Files.setPosixFilePermissions(basePath, perms);
+                Files.setPosixFilePermissions(statePath, perms);
+            } catch (final IOException e) {
+                log.error("Error changing permissions for the state or base 
directory {} ", stateDir.getPath(), e);
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 93e81c0..5048962 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -56,6 +56,7 @@ import static 
org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_F
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
@@ -126,8 +127,8 @@ public class StateDirectoryTest {
         try {
             final Set<PosixFilePermission> baseFilePermissions = 
Files.getPosixFilePermissions(statePath);
             final Set<PosixFilePermission> appFilePermissions = 
Files.getPosixFilePermissions(basePath);
-            assertThat(expectedPermissions.equals(baseFilePermissions), 
is(true));
-            assertThat(expectedPermissions.equals(appFilePermissions), 
is(true));
+            assertThat(expectedPermissions, equalTo(baseFilePermissions));
+            assertThat(expectedPermissions, equalTo(appFilePermissions));
         } catch (final IOException e) {
             fail("Should create correct files and set correct permissions");
         }
@@ -545,9 +546,13 @@ public class StateDirectoryTest {
 
     @Test
     public void shouldNotCreateBaseDirectory() throws IOException {
-        initializeStateDirectory(false);
-        assertFalse(stateDir.exists());
-        assertFalse(appDir.exists());
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            initializeStateDirectory(false);
+            assertThat(stateDir.exists(), is(false));
+            assertThat(appDir.exists(), is(false));
+            assertThat(appender.getMessages(),
+                not(hasItem(containsString("Error changing permissions for the 
state or base directory"))));
+        }
     }
 
     @Test

Reply via email to