himanshug closed pull request #6285: Securing passwords used for SSL
connections to Kafka
URL: https://github.com/apache/incubator-druid/pull/6285
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md
b/docs/content/development/extensions-core/kafka-ingestion.md
index ebc240a2d16..1bd88fd2406 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -166,7 +166,7 @@ For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as
topic patterns are not supported.|yes|
-|`consumerProperties`|Map<String, String>|A map of properties to be passed to
the Kafka consumer. This must contain a property `bootstrap.servers` with a
list of Kafka brokers in the form:
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.|yes|
+|`consumerProperties`|Map<String, Object>|A map of properties to be passed to
the Kafka consumer. This must contain a property `bootstrap.servers` with a
list of Kafka brokers in the form:
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the
`keystore`, `truststore` and `key` passwords can be provided as a [Password
Provider](../../operations/password-provider.html) or String password.|yes|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of
tasks (no replication). Replica tasks will always be assigned to different
workers to provide resiliency against node failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
This means that the maximum number of reading tasks will be `taskCount *
replicas` and the total number of tasks (*reading* + *publishing*) will be
higher than this. See 'Capacity Planning' below for more details. The number of
reading tasks will be less than `taskCount` if `taskCount >
{numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing their segment.|no (default == PT1H)|
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
index 3c60449e410..6a9af7fcea9 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
@@ -39,7 +39,7 @@
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
- private final Map<String, String> consumerProperties;
+ private final Map<String, Object> consumerProperties;
private final boolean useTransaction;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
@@ -51,7 +51,7 @@ public KafkaIOConfig(
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
- @JsonProperty("consumerProperties") Map<String, String>
consumerProperties,
+ @JsonProperty("consumerProperties") Map<String, Object>
consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -114,7 +114,7 @@ public KafkaPartitions getEndPartitions()
}
@JsonProperty
- public Map<String, String> getConsumerProperties()
+ public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 0e362e2e7e9..bb73651e6e8 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -41,10 +42,12 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -92,6 +95,7 @@
private final KafkaIOConfig ioConfig;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final KafkaIndexTaskRunner runner;
+ private final ObjectMapper configMapper;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
@@ -106,7 +110,8 @@ public KafkaIndexTask(
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
- @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+ @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+ @JacksonInject ObjectMapper configMapper
)
{
super(
@@ -122,6 +127,7 @@ public KafkaIndexTask(
this.tuningConfig = Preconditions.checkNotNull(tuningConfig,
"tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
+ this.configMapper = configMapper;
final CircularBuffer<Throwable> savedParseExceptions;
if (tuningConfig.getMaxSavedParseExceptions() > 0) {
savedParseExceptions = new
CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
@@ -198,7 +204,6 @@ public KafkaIOConfig getIOConfig()
}
-
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
@@ -285,9 +290,7 @@ StreamAppenderatorDriver newDriver(
final Properties props = new Properties();
- for (Map.Entry<String, String> entry :
ioConfig.getConsumerProperties().entrySet()) {
- props.setProperty(entry.getKey(), entry.getValue());
- }
+ addConsumerPropertiesFromConfig(props, configMapper,
ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "none");
@@ -301,6 +304,25 @@ StreamAppenderatorDriver newDriver(
}
}
+ public static void addConsumerPropertiesFromConfig(Properties properties,
ObjectMapper configMapper, Map<String, Object> consumerProperties)
+ {
+ // Extract passwords before SSL connection to Kafka
+ for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
+ String propertyKey = entry.getKey();
+ if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
+ || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
+ || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
+ PasswordProvider configPasswordProvider = configMapper.convertValue(
+ entry.getValue(),
+ PasswordProvider.class
+ );
+ properties.setProperty(propertyKey,
configPasswordProvider.getPassword());
+ } else {
+ properties.setProperty(propertyKey, String.valueOf(entry.getValue()));
+ }
+ }
+ }
+
static void assignPartitions(
final KafkaConsumer consumer,
final String topic,
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 48086582fc1..b7845cae220 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1018,7 +1018,7 @@ protected void tryInit()
props.setProperty("metadata.max.age.ms", "10000");
props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s",
RealtimeIndexTask.makeRandomId()));
- props.putAll(ioConfig.getConsumerProperties());
+ KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper,
ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
@@ -1918,7 +1918,7 @@ private void createKafkaTasksForGroup(int groupId, int
replicas) throws JsonProc
}
TaskGroup group = taskGroups.get(groupId);
- Map<String, String> consumerProperties =
Maps.newHashMap(ioConfig.getConsumerProperties());
+ Map<String, Object> consumerProperties =
Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime =
taskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime =
taskGroups.get(groupId).maximumMessageTime.orNull();
@@ -1960,7 +1960,8 @@ private void createKafkaTasksForGroup(int groupId, int
replicas) throws JsonProc
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ sortingMapper
);
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index b02458fd9d4..44c2bb2d6f7 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -32,12 +32,15 @@
public class KafkaSupervisorIOConfig
{
public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+ public static final String TRUST_STORE_PASSWORD_KEY =
"ssl.truststore.password";
+ public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
+ public static final String KEY_PASSWORD_KEY = "ssl.key.password";
private final String topic;
private final Integer replicas;
private final Integer taskCount;
private final Duration taskDuration;
- private final Map<String, String> consumerProperties;
+ private final Map<String, Object> consumerProperties;
private final Duration startDelay;
private final Duration period;
private final boolean useEarliestOffset;
@@ -52,7 +55,7 @@ public KafkaSupervisorIOConfig(
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
- @JsonProperty("consumerProperties") Map<String, String>
consumerProperties,
+ @JsonProperty("consumerProperties") Map<String, Object>
consumerProperties,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -110,7 +113,7 @@ public Duration getTaskDuration()
}
@JsonProperty
- public Map<String, String> getConsumerProperties()
+ public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index cd44d68b700..6dd210ae4c5 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -486,7 +486,7 @@ public void testIncrementalHandOff() throws Exception
kafkaProducer.send(record).get();
}
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -581,7 +581,7 @@ public void testIncrementalHandOffMaxTotalRows() throws
Exception
kafkaProducer.send(records.get(i)).get();
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -698,7 +698,7 @@ public void testTimeBasedIncrementalHandOff() throws
Exception
kafkaProducer.send(record).get();
}
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -2027,7 +2027,8 @@ private KafkaIndexTask createTask(
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@@ -2073,7 +2074,8 @@ private KafkaIndexTask createTask(
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5a7df1c9509..a7dc2041b8a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.hamcrest.CoreMatchers;
@@ -32,6 +33,8 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Properties;
+
public class KafkaSupervisorIOConfigTest
{
private final ObjectMapper mapper;
@@ -119,6 +122,31 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
}
+ @Test
+ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
+ {
+ String jsonStr = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"topic\": \"my-topic\",\n"
+ + " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\",\n"
+ + " \"ssl.truststore.password\":{\"type\": \"default\",
\"password\": \"mytruststorepassword\"},\n"
+ + " \"ssl.keystore.password\":{\"type\": \"default\",
\"password\": \"mykeystorepassword\"},\n"
+ + " \"ssl.key.password\":\"mykeypassword\"}\n"
+ + "}";
+
+ KafkaSupervisorIOConfig config = mapper.readValue(
+ jsonStr, KafkaSupervisorIOConfig.class
+ );
+ Properties props = new Properties();
+ KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper,
config.getConsumerProperties());
+
+ Assert.assertEquals("my-topic", config.getTopic());
+ Assert.assertEquals("localhost:9092",
props.getProperty("bootstrap.servers"));
+ Assert.assertEquals("mytruststorepassword",
props.getProperty("ssl.truststore.password"));
+ Assert.assertEquals("mykeystorepassword",
props.getProperty("ssl.keystore.password"));
+ Assert.assertEquals("mykeypassword",
props.getProperty("ssl.key.password"));
+ }
+
@Test
public void testTopicRequired() throws Exception
{
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index d5b048a239c..c4e24f185cc 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2583,7 +2583,7 @@ private KafkaSupervisor getSupervisor(
String kafkaHost
)
{
- Map<String, String> consumerProperties = new HashMap<>();
+ Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
@@ -2711,7 +2711,8 @@ private KafkaIndexTask createKafkaIndexTask(
Collections.emptyMap(),
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 561276e6ed3..c1a06716a3c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -110,9 +110,9 @@ public int getPort()
return props;
}
- public Map<String, String> consumerProperties()
+ public Map<String, Object> consumerProperties()
{
- final Map<String, String> props = Maps.newHashMap();
+ final Map<String, Object> props = Maps.newHashMap();
props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]