This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ddce06f9cc Bug fix: Handle missing shade config overwrites for Kafka 
(#13437)
ddce06f9cc is described below

commit ddce06f9cc557463e79f8a451224b44f3e63ff5d
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Jun 20 00:07:03 2024 +0530

    Bug fix: Handle missing shade config overwrites for Kafka (#13437)
    
    Co-authored-by: Kartik Khare <[email protected]>
---
 .../KafkaConfigBackwardCompatibleUtils.java        | 56 ++++++++++++++++++++++
 1 file changed, 56 insertions(+)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
index 14bc6dcb06..7f7cfb463e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
@@ -18,17 +18,23 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
 public class KafkaConfigBackwardCompatibleUtils {
+
   private KafkaConfigBackwardCompatibleUtils() {
   }
 
   public static final String KAFKA_COMMON_PACKAGE_PREFIX = 
"org.apache.kafka.common";
   public static final String PINOT_SHADED_PACKAGE_PREFIX = 
"org.apache.pinot.shaded.";
+  public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
 
   /**
    * Handle the stream config to replace the Kafka common package with the 
shaded version if needed.
@@ -37,6 +43,7 @@ public class KafkaConfigBackwardCompatibleUtils {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
     //FIXME: This needs to be done because maven shade plugin also overwrites 
the constants in the classes
     String prefixToReplace = 
KAFKA_COMMON_PACKAGE_PREFIX.replace(PINOT_SHADED_PACKAGE_PREFIX, "");
+
     for (Map.Entry<String, String> entry : streamConfigMap.entrySet()) {
       String[] valueParts = StringUtils.split(entry.getValue(), ' ');
       boolean updated = false;
@@ -57,9 +64,58 @@ public class KafkaConfigBackwardCompatibleUtils {
           }
         }
       }
+
       if (updated) {
         entry.setValue(String.join(" ", valueParts));
       }
     }
+
+    // Read the file specified by the -Djava.security.auth.login.config VM 
argument
+    String loginConfigFile = 
System.getProperty("java.security.auth.login.config");
+    StringBuilder jaasConfigContent = new StringBuilder();
+    if (loginConfigFile != null) {
+      try (BufferedReader br = new BufferedReader(new InputStreamReader(new 
FileInputStream(loginConfigFile)))) {
+        String line;
+        while ((line = br.readLine()) != null) {
+          jaasConfigContent.append(line).append("\n");
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to read JAAS config file: " + 
loginConfigFile, e);
+      }
+    }
+
+    // Process JAAS config content
+    if (jaasConfigContent.length() > 0) {
+      String jaasContent = jaasConfigContent.toString();
+      boolean updated = false;
+      String[] lines = jaasContent.split("\n");
+      for (int i = 0; i < lines.length; i++) {
+        if (lines[i].contains(prefixToReplace)) {
+          String className = lines[i].trim().split(" ")[0];
+          if (className.startsWith(prefixToReplace)) {
+            try {
+              Class.forName(className);
+            } catch (ClassNotFoundException e1) {
+              // If not, replace the class with the shaded version
+              try {
+                String shadedClassName = PINOT_SHADED_PACKAGE_PREFIX + 
className.substring(prefixToReplace.length());
+                Class.forName(shadedClassName);
+                lines[i] = lines[i].replace(className, shadedClassName);
+                updated = true;
+              } catch (ClassNotFoundException e2) {
+                // Do nothing, shaded class is not found as well, keep the 
original class
+              }
+            }
+          }
+        }
+      }
+      if (updated) {
+        String updatedJaasConfig = String.join("\n", lines);
+        String jassConfigProperty =
+            updatedJaasConfig.substring(updatedJaasConfig.indexOf("{") + 
1).substring(0, updatedJaasConfig.indexOf("}"))
+                .replace("\n", " ");
+        streamConfigMap.putIfAbsent(SASL_JAAS_CONFIG, jassConfigProperty);
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to