This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 21606bd  ATLAS-4263: Fix KafkaUtils to always enclose property values 
in double-quotes (#137)
21606bd is described below

commit 21606bdc20e3890ddf9d26555fc71e4175310950
Author: Vlad Glinsky <61428392+vladhlin...@users.noreply.github.com>
AuthorDate: Tue May 4 12:55:07 2021 +0300

    ATLAS-4263: Fix KafkaUtils to always enclose property values in 
double-quotes (#137)
---
 .../java/org/apache/atlas/utils/KafkaUtils.java    | 19 +----
 .../org/apache/atlas/utils/KafkaUtilsTest.java     | 89 ++++++++++++++++++----
 2 files changed, 80 insertions(+), 28 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java 
b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index eea3311..1674422 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -318,20 +318,9 @@ public class KafkaUtils implements AutoCloseable {
             return optionVal;
         }
 
-        String ret = optionVal;
-
-        // For property values which have special chars like "@" or "/", we 
need to enclose it in
-        // double quotes, so that Kafka can parse it
-        // If the property is already enclosed in double quotes, then do 
nothing.
-        if (optionVal.indexOf(0) != '"' && 
optionVal.indexOf(optionVal.length() - 1) != '"') {
-            // If the string as special characters like except _,-
-            final String SPECIAL_CHAR_LIST = "/!@#%^&*";
-
-            if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
-                ret = String.format("\"%s\"", optionVal);
-            }
-        }
-
-        return ret;
+        // Enclose property values in double quotes, so that Kafka can parse 
it.
+        // Escape all double quotes that may occur in the property value.
+        String doubleQuoteEscaped = optionVal.replace("\"", "\\\"");
+        return String.format("\"%s\"", doubleQuoteEscaped);
     }
 }
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java 
b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
index 14739cd..562e28a 100644
--- a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
+++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
@@ -20,11 +20,15 @@ package org.apache.atlas.utils;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.JaasContext;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
-
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.testng.Assert.assertNull;
@@ -55,9 +59,10 @@ public class KafkaUtilsTest {
 
         assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
         
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
 not present in new property");
-        assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), 
"useKeyTab not present in new property or value doesn't match");
-        assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), 
"storeKey not present in new property or value doesn't match");
-        assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+        assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab 
+ "\""), "useKeyTab not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("storeKey=\""+ optionStoreKey + 
"\""), "storeKey not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("serviceName=\"" + 
optionServiceName + "\""), "serviceName not present in new property or value 
doesn't match");
+        assertJaaSConfigLoadable(newPropertyValue);
 
     }
 
@@ -82,9 +87,10 @@ public class KafkaUtilsTest {
 
         assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
         
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
 not present in new property");
-        assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), 
"useKeyTab not present in new property or value doesn't match");
-        assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), 
"storeKey not present in new property or value doesn't match");
-        assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+        assertTrue(newPropertyValue.contains("useKeyTab=\"" + optionUseKeyTab 
+ "\""), "useKeyTab not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("storeKey=\""+ optionStoreKey + 
"\""), "storeKey not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("serviceName=\"" + 
optionServiceName + "\""), "serviceName not present in new property or value 
doesn't match");
+        assertJaaSConfigLoadable(newPropertyValue);
 
     }
 
@@ -134,6 +140,7 @@ public class KafkaUtilsTest {
             
assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag
 not present in new property");
             assertTrue(newPropertyValue.contains("keyTabPath=\"" + 
optionKeyTabPath + "\""));
             assertTrue(newPropertyValue.contains("principal=\""+ 
updatedPrincipalValue + "\""));
+            assertJaaSConfigLoadable(newPropertyValue);
 
         } catch (IOException e) {
             fail("Failed while getting updated principal value with exception 
: " + e.getMessage());
@@ -170,9 +177,10 @@ public class KafkaUtilsTest {
             String newPropertyValue = 
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
             assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
             assertTrue(newPropertyValue.contains(loginModuleControlFlag), 
"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + 
optionUseKeyTab), "useKeyTab not present in new property or value doesn't 
match");
-            assertTrue(newPropertyValue.contains("storeKey=" + 
optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("useKeyTab=\"" + 
optionUseKeyTab + "\""), "useKeyTab not present in new property or value 
doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=\"" + 
optionStoreKey + "\""), "storeKey not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("serviceName=\"" + 
optionServiceName + "\""), "serviceName not present in new property or value 
doesn't match");
+            assertJaaSConfigLoadable(newPropertyValue);
         }
     }
 
@@ -204,9 +212,64 @@ public class KafkaUtilsTest {
             String newPropertyValue = 
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
             assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
             assertTrue(newPropertyValue.contains(loginModuleControlFlag), 
"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + 
optionUseKeyTab), "useKeyTab not present in new property or value doesn't 
match");
-            assertTrue(newPropertyValue.contains("storeKey=" + 
optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + 
optionServiceName), "serviceName not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("useKeyTab=\"" + 
optionUseKeyTab + "\""), "useKeyTab not present in new property or value 
doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=\"" + 
optionStoreKey + "\""), "storeKey not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("serviceName=\"" + 
optionServiceName + "\""), "serviceName not present in new property or value 
doesn't match");
+            assertJaaSConfigLoadable(newPropertyValue);
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTokenAuthConfig() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = 
"org.apache.kafka.common.security.scram.ScramLoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "false";
+        final String optionServiceName = "kafka";
+        final String optionTokenAuth = "true";
+        final String optionUsername = "30CQ4q1hQMy0dB6X0eXfxQ";
+        final String optionPassword = 
"KdaUQ4FlKWlDxwQrAeFGUVbb6sR0P+zoqOZDZjtIRP1wseXbSbhiTjz3QI9Ur9o4LTYZSv8TE1QqUC4FSwnoTA==";
+
+        configuration.setProperty("atlas.kafka.bootstrap.servers", 
"localhost:9100");
+        
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        
configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", 
loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", 
optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", 
optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName", 
optionServiceName);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.tokenauth", 
optionTokenAuth);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.username", 
optionUsername);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.password", 
optionPassword);
+
+        try (MockedStatic mockedKafkaUtilsClass = 
Mockito.mockStatic(KafkaUtils.class)) {
+            mockedKafkaUtilsClass.when(() -> 
KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+            mockedKafkaUtilsClass.when(() -> 
KafkaUtils.setKafkaJAASProperties(configuration, 
properties)).thenCallRealMethod();
+
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+            String newPropertyValue = 
properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            assertTrue(newPropertyValue.contains(loginModuleName), 
"loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag), 
"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=\"" + 
optionUseKeyTab + "\""), "useKeyTab not present in new property or value 
doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=\"" + 
optionStoreKey + "\""), "storeKey not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("serviceName=\"" + 
optionServiceName + "\""), "serviceName not present in new property or value 
doesn't match");
+            assertTrue(newPropertyValue.contains("tokenauth=\"" + 
optionTokenAuth + "\""), "tokenauth not present in new property or value 
doesn't match");
+            assertTrue(newPropertyValue.contains("username=\"" + 
optionUsername + "\""), "username not present in new property or value doesn't 
match");
+            assertTrue(newPropertyValue.contains("password=\"" + 
optionPassword + "\""), "password not present in new property or value doesn't 
match");
+            assertJaaSConfigLoadable(newPropertyValue);
+        }
+    }
+
+    private void assertJaaSConfigLoadable(String jaasConfig) {
+        // Ensure that JaaS config can be loaded
+        Map<String, Password> jaasConfigs = new HashMap<>();
+        jaasConfigs.put(SaslConfigs.SASL_JAAS_CONFIG, new 
Password(jaasConfig));
+        try {
+            JaasContext.loadClientContext(jaasConfigs);
+        } catch (IllegalArgumentException e) {
+            fail(String.format("JaaS config '%s' can not be loaded", 
jaasConfig), e);
         }
     }
 

Reply via email to