This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ddce06f9cc Bug fix: Handle missing shade config overwrites for Kafka
(#13437)
ddce06f9cc is described below
commit ddce06f9cc557463e79f8a451224b44f3e63ff5d
Author: Kartik Khare <[email protected]>
AuthorDate: Thu Jun 20 00:07:03 2024 +0530
Bug fix: Handle missing shade config overwrites for Kafka (#13437)
Co-authored-by: Kartik Khare <[email protected]>
---
.../KafkaConfigBackwardCompatibleUtils.java | 56 ++++++++++++++++++++++
1 file changed, 56 insertions(+)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
index 14bc6dcb06..7f7cfb463e 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConfigBackwardCompatibleUtils.java
@@ -18,17 +18,23 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamConfig;
public class KafkaConfigBackwardCompatibleUtils {
+
private KafkaConfigBackwardCompatibleUtils() {
}
public static final String KAFKA_COMMON_PACKAGE_PREFIX =
"org.apache.kafka.common";
public static final String PINOT_SHADED_PACKAGE_PREFIX =
"org.apache.pinot.shaded.";
+ public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
/**
* Handle the stream config to replace the Kafka common package with the
shaded version if needed.
@@ -37,6 +43,7 @@ public class KafkaConfigBackwardCompatibleUtils {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
//FIXME: This needs to be done because maven shade plugin also overwrites
the constants in the classes
String prefixToReplace =
KAFKA_COMMON_PACKAGE_PREFIX.replace(PINOT_SHADED_PACKAGE_PREFIX, "");
+
for (Map.Entry<String, String> entry : streamConfigMap.entrySet()) {
String[] valueParts = StringUtils.split(entry.getValue(), ' ');
boolean updated = false;
@@ -57,9 +64,58 @@ public class KafkaConfigBackwardCompatibleUtils {
}
}
}
+
if (updated) {
entry.setValue(String.join(" ", valueParts));
}
}
+
+ // Read the file specified by the -Djava.security.auth.login.config VM
argument
+ String loginConfigFile =
System.getProperty("java.security.auth.login.config");
+ StringBuilder jaasConfigContent = new StringBuilder();
+ if (loginConfigFile != null) {
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(new
FileInputStream(loginConfigFile)))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ jaasConfigContent.append(line).append("\n");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read JAAS config file: " +
loginConfigFile, e);
+ }
+ }
+
+ // Process JAAS config content
+ if (jaasConfigContent.length() > 0) {
+ String jaasContent = jaasConfigContent.toString();
+ boolean updated = false;
+ String[] lines = jaasContent.split("\n");
+ for (int i = 0; i < lines.length; i++) {
+ if (lines[i].contains(prefixToReplace)) {
+ String className = lines[i].trim().split(" ")[0];
+ if (className.startsWith(prefixToReplace)) {
+ try {
+ Class.forName(className);
+ } catch (ClassNotFoundException e1) {
+ // If not, replace the class with the shaded version
+ try {
+ String shadedClassName = PINOT_SHADED_PACKAGE_PREFIX +
className.substring(prefixToReplace.length());
+ Class.forName(shadedClassName);
+ lines[i] = lines[i].replace(className, shadedClassName);
+ updated = true;
+ } catch (ClassNotFoundException e2) {
+ // Do nothing, shaded class is not found as well, keep the
original class
+ }
+ }
+ }
+ }
+ }
+ if (updated) {
+ String updatedJaasConfig = String.join("\n", lines);
+ String jassConfigProperty =
+ updatedJaasConfig.substring(updatedJaasConfig.indexOf("{") +
1).substring(0, updatedJaasConfig.indexOf("}"))
+ .replace("\n", " ");
+ streamConfigMap.putIfAbsent(SASL_JAAS_CONFIG, jassConfigProperty);
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]