This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 901ebbb7445 Allow for kafka emitter producer secrets to be masked in 
logs  (#15485)
901ebbb7445 is described below

commit 901ebbb744501252726e5d6c86541f1c6d46c131
Author: Tom <[email protected]>
AuthorDate: Fri Dec 15 09:21:21 2023 -0800

    Allow for kafka emitter producer secrets to be masked in logs  (#15485)
    
    * Allow for kafka emitter producer secrets to be masked in logs instead of 
being visible
    
    This change will allow for kafka producer config values that should be 
secrets to not show up in the logs.
    This will enhance the security of the people who use the kafka emitter to 
use this if they want to.
    This is opt in and will not affect prior configs for this emitter
    
    * fix checkstyle issue
    
    * change property name
---
 .../apache/druid/emitter/kafka/KafkaEmitter.java   |  1 +
 .../druid/emitter/kafka/KafkaEmitterConfig.java    | 23 +++++-
 .../emitter/kafka/KafkaEmitterConfigTest.java      | 95 +++++++++++++++-------
 .../druid/emitter/kafka/KafkaEmitterTest.java      |  2 +-
 .../druid/utils/DynamicConfigProviderUtils.java    |  8 +-
 5 files changed, 89 insertions(+), 40 deletions(-)

diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index c776e3f2f8b..7485cbaab6d 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -115,6 +115,7 @@ public class KafkaEmitter implements Emitter
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
       props.put(ProducerConfig.RETRIES_CONFIG, DEFAULT_RETRIES);
       props.putAll(config.getKafkaProducerConfig());
+      props.putAll(config.getKafkaProducerSecrets().getConfig());
 
       return new KafkaProducer<>(props);
     }
diff --git 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index 019edd095ea..d6d823c0a88 100644
--- 
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ 
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -26,6 +26,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.DynamicConfigProvider;
+import org.apache.druid.metadata.MapStringDynamicConfigProvider;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
 import javax.annotation.Nullable;
@@ -73,6 +75,8 @@ public class KafkaEmitterConfig
   private final String clusterName;
   @JsonProperty("producer.config")
   private final Map<String, String> kafkaProducerConfig;
+  @JsonProperty("producer.hiddenProperties")
+  private final DynamicConfigProvider<String> kafkaProducerSecrets;
 
   @JsonCreator
   public KafkaEmitterConfig(
@@ -83,7 +87,8 @@ public class KafkaEmitterConfig
       @Nullable @JsonProperty("request.topic") String requestTopic,
       @Nullable @JsonProperty("segmentMetadata.topic") String 
segmentMetadataTopic,
       @JsonProperty("clusterName") String clusterName,
-      @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig
+      @JsonProperty("producer.config") @Nullable Map<String, String> 
kafkaProducerConfig,
+      @JsonProperty("producer.hiddenProperties") @Nullable 
DynamicConfigProvider<String> kafkaProducerSecrets
   )
   {
     this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, 
"druid.emitter.kafka.bootstrap.servers can not be null");
@@ -94,6 +99,7 @@ public class KafkaEmitterConfig
     this.segmentMetadataTopic = 
this.eventTypes.contains(EventType.SEGMENT_METADATA) ? 
Preconditions.checkNotNull(segmentMetadataTopic, 
"druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
     this.clusterName = clusterName;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() 
: kafkaProducerConfig;
+    this.kafkaProducerSecrets = kafkaProducerSecrets == null ? new 
MapStringDynamicConfigProvider(ImmutableMap.of()) : kafkaProducerSecrets;
   }
 
   private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, 
String requestTopic)
@@ -159,6 +165,12 @@ public class KafkaEmitterConfig
     return kafkaProducerConfig;
   }
 
+  @JsonProperty
+  public DynamicConfigProvider<String> getKafkaProducerSecrets()
+  {
+    return kafkaProducerSecrets;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -198,7 +210,10 @@ public class KafkaEmitterConfig
     if (getClusterName() != null ? 
!getClusterName().equals(that.getClusterName()) : that.getClusterName() != 
null) {
       return false;
     }
-    return getKafkaProducerConfig().equals(that.getKafkaProducerConfig());
+    if (!getKafkaProducerConfig().equals(that.getKafkaProducerConfig())) {
+      return false;
+    }
+    return 
getKafkaProducerSecrets().getConfig().equals(that.getKafkaProducerSecrets().getConfig());
   }
 
   @Override
@@ -212,6 +227,7 @@ public class KafkaEmitterConfig
     result = 31 * result + (getSegmentMetadataTopic() != null ? 
getSegmentMetadataTopic().hashCode() : 0);
     result = 31 * result + (getClusterName() != null ? 
getClusterName().hashCode() : 0);
     result = 31 * result + getKafkaProducerConfig().hashCode();
+    result = 31 * result + getKafkaProducerSecrets().getConfig().hashCode();
     return result;
   }
 
@@ -226,7 +242,8 @@ public class KafkaEmitterConfig
            ", request.topic='" + requestTopic + '\'' +
            ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
            ", clusterName='" + clusterName + '\'' +
-           ", Producer.config=" + kafkaProducerConfig +
+           ", producer.config=" + kafkaProducerConfig + '\'' +
+           ", producer.hiddenProperties=" + kafkaProducerSecrets +
            '}';
   }
 }
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index c4d5811bcb5..603c8e6701b 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -25,48 +25,67 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.metadata.DynamicConfigProvider;
+import org.apache.druid.metadata.MapStringDynamicConfigProvider;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 
 public class KafkaEmitterConfigTest
 {
-  private ObjectMapper mapper = new DefaultObjectMapper();
+  private static final DynamicConfigProvider<String> DEFAULT_PRODUCER_SECRETS 
= new MapStringDynamicConfigProvider(
+      ImmutableMap.of("testSecretKey", "testSecretValue"));
+  private static final ObjectMapper MAPPER = new DefaultObjectMapper();
 
   @Before
   public void setUp()
   {
-    mapper.setInjectableValues(new 
InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
+    MAPPER.setInjectableValues(new 
InjectableValues.Std().addValue(ObjectMapper.class, new DefaultObjectMapper()));
   }
 
   @Test
   public void testSerDeserKafkaEmitterConfig() throws IOException
   {
-    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
null, "metricTest",
-        "alertTest", "requestTest", "metadataTest",
-        "clusterNameTest", ImmutableMap.<String, String>builder()
-        .put("testKey", "testValue").build()
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
+        "hostname",
+        null,
+        "metricTest",
+        "alertTest",
+        "requestTest",
+        "metadataTest",
+        "clusterNameTest",
+        ImmutableMap.<String, String>builder()
+                    .put("testKey", "testValue").build(),
+        DEFAULT_PRODUCER_SECRETS
     );
-    String kafkaEmitterConfigString = 
mapper.writeValueAsString(kafkaEmitterConfig);
-    KafkaEmitterConfig kafkaEmitterConfigExpected = 
mapper.readerFor(KafkaEmitterConfig.class)
-        .readValue(kafkaEmitterConfigString);
+    String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
+                                                          
.readValue(kafkaEmitterConfigString);
     Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
   }
 
   @Test
   public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws 
IOException
   {
-    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
null, "metricTest",
-        "alertTest", null, "metadataTest",
-        "clusterNameTest", ImmutableMap.<String, String>builder()
-        .put("testKey", "testValue").build()
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
+        "hostname",
+        null,
+        "metricTest",
+        "alertTest",
+        null,
+        "metadataTest",
+        "clusterNameTest",
+        ImmutableMap.<String, String>builder()
+                    .put("testKey", "testValue").build(),
+        DEFAULT_PRODUCER_SECRETS
     );
-    String kafkaEmitterConfigString = 
mapper.writeValueAsString(kafkaEmitterConfig);
-    KafkaEmitterConfig kafkaEmitterConfigExpected = 
mapper.readerFor(KafkaEmitterConfig.class)
-        .readValue(kafkaEmitterConfigString);
+    String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
+                                                          
.readValue(kafkaEmitterConfigString);
     Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
   }
 
@@ -75,27 +94,34 @@ public class KafkaEmitterConfigTest
   {
     Set<KafkaEmitterConfig.EventType> eventTypeSet = new 
HashSet<KafkaEmitterConfig.EventType>();
     eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
-    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", 
eventTypeSet, null,
-        null, null, "metadataTest",
-        "clusterNameTest", ImmutableMap.<String, String>builder()
-        .put("testKey", "testValue").build()
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig(
+        "hostname",
+        eventTypeSet,
+        null,
+        null,
+        null,
+        "metadataTest",
+        "clusterNameTest",
+        ImmutableMap.<String, String>builder()
+                    .put("testKey", "testValue").build(),
+        DEFAULT_PRODUCER_SECRETS
     );
-    String kafkaEmitterConfigString = 
mapper.writeValueAsString(kafkaEmitterConfig);
-    KafkaEmitterConfig kafkaEmitterConfigExpected = 
mapper.readerFor(KafkaEmitterConfig.class)
-        .readValue(kafkaEmitterConfigString);
+    String kafkaEmitterConfigString = 
MAPPER.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = 
MAPPER.readerFor(KafkaEmitterConfig.class)
+                                                          
.readValue(kafkaEmitterConfigString);
     Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
   }
 
   @Test
-  public void testSerDeNotRequiredKafkaProducerConfig()
+  public void testSerDeNotRequiredKafkaProducerConfigOrKafkaSecretProducer()
   {
     KafkaEmitterConfig kafkaEmitterConfig = new 
KafkaEmitterConfig("localhost:9092", null, "metricTest",
-        "alertTest", null, "metadataTest",
-        "clusterNameTest", null
+                                                                   
"alertTest", null, "metadataTest",
+                                                                   
"clusterNameTest", null, null
     );
     try {
       @SuppressWarnings("unused")
-      KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, mapper);
+      KafkaEmitter emitter = new KafkaEmitter(kafkaEmitterConfig, MAPPER);
     }
     catch (NullPointerException e) {
       Assert.fail();
@@ -105,9 +131,18 @@ public class KafkaEmitterConfigTest
   @Test
   public void testDeserializeEventTypesWithDifferentCase() throws 
JsonProcessingException
   {
-    Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, 
mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
-    Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, 
mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
-    Assert.assertThrows(ValueInstantiationException.class, () -> 
mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
+    Assert.assertEquals(
+        KafkaEmitterConfig.EventType.SEGMENT_METADATA,
+        MAPPER.readValue("\"segment_metadata\"", 
KafkaEmitterConfig.EventType.class)
+    );
+    Assert.assertEquals(
+        KafkaEmitterConfig.EventType.ALERTS,
+        MAPPER.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class)
+    );
+    Assert.assertThrows(
+        ValueInstantiationException.class,
+        () -> MAPPER.readValue("\"segmentMetadata\"", 
KafkaEmitterConfig.EventType.class)
+    );
   }
 
   @Test
diff --git 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index f39e8e82606..1c30bee12fa 100644
--- 
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++ 
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -106,7 +106,7 @@ public class KafkaEmitterTest
     ObjectMapper mapper = new ObjectMapper();
     mapper.registerModule(new JodaModule());
     final KafkaEmitter kafkaEmitter = new KafkaEmitter(
-        new KafkaEmitterConfig("", eventsType, "metrics", "alerts", 
requestTopic, "metadata", "test-cluster", null),
+        new KafkaEmitterConfig("", eventsType, "metrics", "alerts", 
requestTopic, "metadata", "test-cluster", null, null),
         mapper
     )
     {
diff --git 
a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
 
b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
index 4c45262aba0..0b47116f0e7 100644
--- 
a/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
+++ 
b/processing/src/main/java/org/apache/druid/utils/DynamicConfigProviderUtils.java
@@ -38,9 +38,7 @@ public class DynamicConfigProviderUtils
         }
       }
       Map<String, String> dynamicConfig = 
extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
-      for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
-        newConfig.put(entry.getKey(), entry.getValue());
-      }
+      newConfig.putAll(dynamicConfig);
     }
     return newConfig;
   }
@@ -55,9 +53,7 @@ public class DynamicConfigProviderUtils
         }
       }
       Map<String, String> dynamicConfig = 
extraConfigFromProvider(config.get(dynamicConfigProviderKey), mapper);
-      for (Map.Entry<String, String> entry : dynamicConfig.entrySet()) {
-        newConfig.put(entry.getKey(), entry.getValue());
-      }
+      newConfig.putAll(dynamicConfig);
     }
     return newConfig;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to