This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new d5244b8  [STORM-3765] fix NPE when drpc.authorizer.acl has no values 
(#3392)
d5244b8 is described below

commit d5244b8cb06a0f840ad1bf5d3faf74948702fdb9
Author: Meng (Ethan) Li <[email protected]>
AuthorDate: Mon May 3 13:17:07 2021 -0500

    [STORM-3765] fix NPE when drpc.authorizer.acl has no values (#3392)
---
 .../auth/authorizer/DRPCSimpleACLAuthorizer.java   | 35 +++++++++-------
 .../authorizer/DRPCSimpleACLAuthorizerTest.java    | 49 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 16 deletions(-)

diff --git 
a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
 
b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index 00d50c4..e45f388 100644
--- 
a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ 
b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -52,25 +52,28 @@ public class DRPCSimpleACLAuthorizer extends 
DRPCAuthorizerBase {
             Map<String, AclFunctionEntry> acl = new HashMap<>();
             Map<String, Object> conf = 
Utils.findAndReadConfigFile(aclFileName);
             if (conf.containsKey(Config.DRPC_AUTHORIZER_ACL)) {
-                Map<String, Map<String, ?>> confAcl =
-                    (Map<String, Map<String, ?>>)
-                        conf.get(Config.DRPC_AUTHORIZER_ACL);
-
-                for (Map.Entry<String, Map<String, ?>> entry : 
confAcl.entrySet()) {
-                    Map<String, ?> val = entry.getValue();
-                    Collection<String> clientUsers = 
val.containsKey(CLIENT_USERS_KEY)
-                            ? (Collection<String>) val.get(CLIENT_USERS_KEY)
-                            : null;
-                    String invocationUser = 
val.containsKey(INVOCATION_USER_KEY)
-                            ? (String) val.get(INVOCATION_USER_KEY)
-                            : null;
-                    acl.put(entry.getKey(),
-                            new AclFunctionEntry(clientUsers, invocationUser));
+                Map<String, Map<String, ?>> confAcl = (Map<String, Map<String, 
?>>) conf.get(Config.DRPC_AUTHORIZER_ACL);
+
+                if (confAcl != null) {
+                    for (Map.Entry<String, Map<String, ?>> entry : 
confAcl.entrySet()) {
+                        Map<String, ?> val = entry.getValue();
+                        Collection<String> clientUsers = 
val.containsKey(CLIENT_USERS_KEY)
+                                ? (Collection<String>) 
val.get(CLIENT_USERS_KEY)
+                                : null;
+                        String invocationUser = 
val.containsKey(INVOCATION_USER_KEY)
+                                ? (String) val.get(INVOCATION_USER_KEY)
+                                : null;
+                        acl.put(entry.getKey(),
+                                new AclFunctionEntry(clientUsers, 
invocationUser));
+                    }
                 }
-            } else if (!permitWhenMissingFunctionEntry) {
-                LOG.warn("Requiring explicit ACL entries, but none given. 
Therefore, all operations will be denied.");
             }
+
             this.acl = acl;
+            if (this.acl.isEmpty() && !permitWhenMissingFunctionEntry) {
+                LOG.warn("Requiring explicit ACL entries, but none given. 
Therefore, all operations will be denied.");
+            }
+
             lastUpdate = System.currentTimeMillis();
         }
         return acl;
diff --git 
a/storm-client/test/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizerTest.java
 
b/storm-client/test/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizerTest.java
index a0f9bee..2bf15fd 100644
--- 
a/storm-client/test/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizerTest.java
+++ 
b/storm-client/test/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizerTest.java
@@ -12,6 +12,10 @@
 
 package org.apache.storm.security.auth.authorizer;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.Config;
@@ -25,6 +29,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 public class DRPCSimpleACLAuthorizerTest {
 
     private static final String function = "jump";
@@ -158,4 +166,45 @@ public class DRPCSimpleACLAuthorizerTest {
         config.put(DRPCSimpleACLAuthorizer.FUNCTION_KEY, function);
         return authorizer.permit(context, operation, config);
     }
+
+    /**
+     * {@link DRPCSimpleACLAuthorizer} should still work even if {@link 
Config#DRPC_AUTHORIZER_ACL} has no values.
+     * @throws IOException if there is any issue with creating or writing the 
temp file.
+     */
+    @Test
+    public void test_read_acl_no_values() throws IOException {
+        DRPCSimpleACLAuthorizer authorizer = new DRPCSimpleACLAuthorizer();
+
+        File tempFile = File.createTempFile("drpcacl", ".yaml");
+        tempFile.deleteOnExit();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(tempFile));
+        writer.write("drpc.authorizer.acl:");
+        writer.close();
+
+        authorizer.prepare(ImmutableMap
+                .of(Config.DRPC_AUTHORIZER_ACL_STRICT, true, 
Config.DRPC_AUTHORIZER_ACL_FILENAME, tempFile.toString(),
+                        Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
KerberosPrincipalToLocal.class.getName()));
+
+        Map<String, DRPCSimpleACLAuthorizer.AclFunctionEntry> acl = 
authorizer.readAclFromConfig();
+        assertEquals(0, acl.size());
+    }
+
+    /**
+     * The file of {@link Config#DRPC_AUTHORIZER_ACL_FILENAME} can not be 
empty.
+     * @throws IOException if there is any issue with creating the temp file.
+     */
+    @Test
+    public void test_read_acl_empty_file() throws IOException {
+        DRPCSimpleACLAuthorizer authorizer = new DRPCSimpleACLAuthorizer();
+
+        File tempFile = File.createTempFile("drpcacl", ".yaml");
+        tempFile.deleteOnExit();
+
+        authorizer.prepare(ImmutableMap
+                .of(Config.DRPC_AUTHORIZER_ACL_STRICT, true, 
Config.DRPC_AUTHORIZER_ACL_FILENAME, tempFile.toString(),
+                        Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
KerberosPrincipalToLocal.class.getName()));
+
+        Exception exception = assertThrows(RuntimeException.class, 
authorizer::readAclFromConfig);
+        assertTrue(exception.getMessage().contains("doesn't have any valid 
storm configs"));
+    }
 }

Reply via email to