This is an automated email from the ASF dual-hosted git repository.
himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new ab7b479 Securing passwords used for SSL connections to Kafka (#6285)
ab7b479 is described below
commit ab7b4798cc2747bb68550b67f69660965a276dc8
Author: Atul Mohan <[email protected]>
AuthorDate: Thu Oct 11 12:03:01 2018 -0500
Securing passwords used for SSL connections to Kafka (#6285)
* Secure credentials in consumer properties
* Merge master
* Refactor property population into separate method
* Fix property setter
* Fix tests
---
.../development/extensions-core/kafka-ingestion.md | 2 +-
.../apache/druid/indexing/kafka/KafkaIOConfig.java | 6 ++--
.../druid/indexing/kafka/KafkaIndexTask.java | 32 ++++++++++++++++++----
.../indexing/kafka/supervisor/KafkaSupervisor.java | 7 +++--
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 9 ++++--
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 12 ++++----
.../supervisor/KafkaSupervisorIOConfigTest.java | 28 +++++++++++++++++++
.../kafka/supervisor/KafkaSupervisorTest.java | 5 ++--
.../druid/indexing/kafka/test/TestBroker.java | 4 +--
9 files changed, 81 insertions(+), 24 deletions(-)
diff --git a/docs/content/development/extensions-core/kafka-ingestion.md
b/docs/content/development/extensions-core/kafka-ingestion.md
index 568fc94..12bd5f6 100644
--- a/docs/content/development/extensions-core/kafka-ingestion.md
+++ b/docs/content/development/extensions-core/kafka-ingestion.md
@@ -166,7 +166,7 @@ For Roaring bitmaps:
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as
topic patterns are not supported.|yes|
-|`consumerProperties`|Map<String, String>|A map of properties to be passed to
the Kafka consumer. This must contain a property `bootstrap.servers` with a
list of Kafka brokers in the form:
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`.|yes|
+|`consumerProperties`|Map<String, Object>|A map of properties to be passed to
the Kafka consumer. This must contain a property `bootstrap.servers` with a
list of Kafka brokers in the form:
`<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. For SSL connections, the
`keystore`, `truststore` and `key` passwords can be provided as a [Password
Provider](../../operations/password-provider.html) or String password.|yes|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of
tasks (no replication). Replica tasks will always be assigned to different
workers to provide resiliency against node failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
This means that the maximum number of reading tasks will be `taskCount *
replicas` and the total number of tasks (*reading* + *publishing*) will be
higher than this. See 'Capacity Planning' below for more details. The number of
reading tasks will be less than `taskCount` if `taskCount >
{numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing their segment.|no (default == PT1H)|
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
index 3c60449..6a9af7f 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java
@@ -39,7 +39,7 @@ public class KafkaIOConfig implements IOConfig
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
- private final Map<String, String> consumerProperties;
+ private final Map<String, Object> consumerProperties;
private final boolean useTransaction;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
@@ -51,7 +51,7 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("baseSequenceName") String baseSequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
- @JsonProperty("consumerProperties") Map<String, String>
consumerProperties,
+ @JsonProperty("consumerProperties") Map<String, Object>
consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@@ -114,7 +114,7 @@ public class KafkaIOConfig implements IOConfig
}
@JsonProperty
- public Map<String, String> getConsumerProperties()
+ public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 0e362e2..bb73651 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -41,10 +42,12 @@ import
org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -92,6 +95,7 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
private final KafkaIOConfig ioConfig;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final KafkaIndexTaskRunner runner;
+ private final ObjectMapper configMapper;
// This value can be tuned in some tests
private long pollRetryMs = 30000;
@@ -106,7 +110,8 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper,
- @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+ @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+ @JacksonInject ObjectMapper configMapper
)
{
super(
@@ -122,6 +127,7 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
this.tuningConfig = Preconditions.checkNotNull(tuningConfig,
"tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
+ this.configMapper = configMapper;
final CircularBuffer<Throwable> savedParseExceptions;
if (tuningConfig.getMaxSavedParseExceptions() > 0) {
savedParseExceptions = new
CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());
@@ -198,7 +204,6 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
}
-
@Override
public TaskStatus run(final TaskToolbox toolbox)
{
@@ -285,9 +290,7 @@ public class KafkaIndexTask extends AbstractTask implements
ChatHandler
final Properties props = new Properties();
- for (Map.Entry<String, String> entry :
ioConfig.getConsumerProperties().entrySet()) {
- props.setProperty(entry.getKey(), entry.getValue());
- }
+ addConsumerPropertiesFromConfig(props, configMapper,
ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "none");
@@ -301,6 +304,25 @@ public class KafkaIndexTask extends AbstractTask
implements ChatHandler
}
}
+ public static void addConsumerPropertiesFromConfig(Properties properties,
ObjectMapper configMapper, Map<String, Object> consumerProperties)
+ {
+ // Extract passwords before SSL connection to Kafka
+ for (Map.Entry<String, Object> entry : consumerProperties.entrySet()) {
+ String propertyKey = entry.getKey();
+ if (propertyKey.equals(KafkaSupervisorIOConfig.TRUST_STORE_PASSWORD_KEY)
+ || propertyKey.equals(KafkaSupervisorIOConfig.KEY_STORE_PASSWORD_KEY)
+ || propertyKey.equals(KafkaSupervisorIOConfig.KEY_PASSWORD_KEY)) {
+ PasswordProvider configPasswordProvider = configMapper.convertValue(
+ entry.getValue(),
+ PasswordProvider.class
+ );
+ properties.setProperty(propertyKey,
configPasswordProvider.getPassword());
+ } else {
+ properties.setProperty(propertyKey, String.valueOf(entry.getValue()));
+ }
+ }
+ }
+
static void assignPartitions(
final KafkaConsumer consumer,
final String topic,
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 4808658..b7845ca 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1018,7 +1018,7 @@ public class KafkaSupervisor implements Supervisor
props.setProperty("metadata.max.age.ms", "10000");
props.setProperty("group.id", StringUtils.format("kafka-supervisor-%s",
RealtimeIndexTask.makeRandomId()));
- props.putAll(ioConfig.getConsumerProperties());
+ KafkaIndexTask.addConsumerPropertiesFromConfig(props, sortingMapper,
ioConfig.getConsumerProperties());
props.setProperty("enable.auto.commit", "false");
@@ -1918,7 +1918,7 @@ public class KafkaSupervisor implements Supervisor
}
TaskGroup group = taskGroups.get(groupId);
- Map<String, String> consumerProperties =
Maps.newHashMap(ioConfig.getConsumerProperties());
+ Map<String, Object> consumerProperties =
Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime =
taskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime =
taskGroups.get(groupId).maximumMessageTime.orNull();
@@ -1960,7 +1960,8 @@ public class KafkaSupervisor implements Supervisor
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ sortingMapper
);
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index b02458f..44c2bb2 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -32,12 +32,15 @@ import java.util.Map;
public class KafkaSupervisorIOConfig
{
public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
+ public static final String TRUST_STORE_PASSWORD_KEY =
"ssl.truststore.password";
+ public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password";
+ public static final String KEY_PASSWORD_KEY = "ssl.key.password";
private final String topic;
private final Integer replicas;
private final Integer taskCount;
private final Duration taskDuration;
- private final Map<String, String> consumerProperties;
+ private final Map<String, Object> consumerProperties;
private final Duration startDelay;
private final Duration period;
private final boolean useEarliestOffset;
@@ -52,7 +55,7 @@ public class KafkaSupervisorIOConfig
@JsonProperty("replicas") Integer replicas,
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
- @JsonProperty("consumerProperties") Map<String, String>
consumerProperties,
+ @JsonProperty("consumerProperties") Map<String, Object>
consumerProperties,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@@ -110,7 +113,7 @@ public class KafkaSupervisorIOConfig
}
@JsonProperty
- public Map<String, String> getConsumerProperties()
+ public Map<String, Object> getConsumerProperties()
{
return consumerProperties;
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index cd44d68..6dd210a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -486,7 +486,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(record).get();
}
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -581,7 +581,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(records.get(i)).get();
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -698,7 +698,7 @@ public class KafkaIndexTaskTest
kafkaProducer.send(record).get();
}
}
- Map<String, String> consumerProps = kafkaServer.consumerProperties();
+ Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaPartitions startPartitions = new KafkaPartitions(topic,
ImmutableMap.of(0, 0L, 1, 0L));
@@ -2027,7 +2027,8 @@ public class KafkaIndexTaskTest
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@@ -2073,7 +2074,8 @@ public class KafkaIndexTaskTest
context,
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 5a7df1c..a7dc204 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.hamcrest.CoreMatchers;
@@ -32,6 +33,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Properties;
+
public class KafkaSupervisorIOConfigTest
{
private final ObjectMapper mapper;
@@ -120,6 +123,31 @@ public class KafkaSupervisorIOConfigTest
}
@Test
+ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception
+ {
+ String jsonStr = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"topic\": \"my-topic\",\n"
+ + " \"consumerProperties\":
{\"bootstrap.servers\":\"localhost:9092\",\n"
+ + " \"ssl.truststore.password\":{\"type\": \"default\",
\"password\": \"mytruststorepassword\"},\n"
+ + " \"ssl.keystore.password\":{\"type\": \"default\",
\"password\": \"mykeystorepassword\"},\n"
+ + " \"ssl.key.password\":\"mykeypassword\"}\n"
+ + "}";
+
+ KafkaSupervisorIOConfig config = mapper.readValue(
+ jsonStr, KafkaSupervisorIOConfig.class
+ );
+ Properties props = new Properties();
+ KafkaIndexTask.addConsumerPropertiesFromConfig(props, mapper,
config.getConsumerProperties());
+
+ Assert.assertEquals("my-topic", config.getTopic());
+ Assert.assertEquals("localhost:9092",
props.getProperty("bootstrap.servers"));
+ Assert.assertEquals("mytruststorepassword",
props.getProperty("ssl.truststore.password"));
+ Assert.assertEquals("mykeystorepassword",
props.getProperty("ssl.keystore.password"));
+ Assert.assertEquals("mykeypassword",
props.getProperty("ssl.key.password"));
+ }
+
+ @Test
public void testTopicRequired() throws Exception
{
String jsonStr = "{\n"
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index d5b048a..c4e24f1 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2583,7 +2583,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
String kafkaHost
)
{
- Map<String, String> consumerProperties = new HashMap<>();
+ Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
@@ -2711,7 +2711,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
Collections.emptyMap(),
null,
null,
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ objectMapper
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
index 561276e..c1a0671 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java
@@ -110,9 +110,9 @@ public class TestBroker implements Closeable
return props;
}
- public Map<String, String> consumerProperties()
+ public Map<String, Object> consumerProperties()
{
- final Map<String, String> props = Maps.newHashMap();
+ final Map<String, Object> props = Maps.newHashMap();
props.put("bootstrap.servers", StringUtils.format("localhost:%d",
getPort()));
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]