This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 307c1b0 adjustments to Kafka integration tests to allow running
against Azure Event Hubs streams (#10463)
307c1b0 is described below
commit 307c1b072006115dc0780913bb3fd7d1831849b4
Author: Clint Wylie <[email protected]>
AuthorDate: Mon Oct 5 08:54:29 2020 -0700
adjustments to Kafka integration tests to allow running against Azure Event
Hubs streams (#10463)
* adjustments to kafka integration tests to allow running against azure
event hubs in kafka mode
* oops
* make better
* more better
---
.../apache/druid/testing/DockerConfigProvider.java | 43 +++++++++++++++++++++-
.../druid/testing/utils/KafkaAdminClient.java | 2 +-
.../druid/testing/utils/KafkaEventWriter.java | 2 +-
.../org/apache/druid/testing/utils/KafkaUtil.java | 15 ++++++++
.../indexer/AbstractKafkaIndexingServiceTest.java | 2 +-
.../ITKafkaIndexingServiceDataFormatTest.java | 15 +++++++-
6 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 11c540f..67266b0 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -21,8 +21,13 @@ package org.apache.druid.testing;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import javax.validation.constraints.NotNull;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -54,6 +59,10 @@ public class DockerConfigProvider implements
IntegrationTestingConfigProvider
@JsonProperty
private String streamEndpoint;
+ @JsonProperty
+ @JsonDeserialize(using = ArbitraryPropertiesJsonDeserializer.class)
+ private Map<String, String> properties = new HashMap<>();
+
@Override
public IntegrationTestingConfig get()
{
@@ -190,7 +199,7 @@ public class DockerConfigProvider implements
IntegrationTestingConfigProvider
@Override
public String getProperty(String prop)
{
- throw new UnsupportedOperationException("DockerConfigProvider does not
support property " + prop);
+ return properties.get(prop);
}
@Override
@@ -208,7 +217,7 @@ public class DockerConfigProvider implements
IntegrationTestingConfigProvider
@Override
public Map<String, String> getProperties()
{
- return new HashMap<>();
+ return properties;
}
@Override
@@ -260,4 +269,34 @@ public class DockerConfigProvider implements
IntegrationTestingConfigProvider
}
};
}
+
+ // there is probably a better way to do this...
+ static class ArbitraryPropertiesJsonDeserializer extends
JsonDeserializer<Map<String, String>>
+ {
+ @Override
+ public Map<String, String> deserialize(JsonParser jsonParser,
DeserializationContext deserializationContext)
+ throws IOException
+ {
+ // given some config input, such as
+ // druid.test.config.properites.a.b.c=d
+ // calling jsonParser.readValueAs(Map.class) here results in a map that
has both nested objects and also
+ // flattened string pairs, so the map looks something like this (in JSON
form):
+ // {
+ // "a" : { "b": { "c" : "d" }}},
+ // "a.b.c":"d"
+ // }
+ // The string pairs are the values we want to populate this map with, so
filtering out the top level keys which
+ // do not have string values leaves us with
+ // { "a.b.c":"d"}
+ // from the given example, which is what we want
+ Map<String, Object> parsed = jsonParser.readValueAs(Map.class);
+ Map<String, String> flat = new HashMap<>();
+ for (Map.Entry<String, Object> entry : parsed.entrySet()) {
+ if (!(entry.getValue() instanceof Map)) {
+ flat.put(entry.getKey(), (String) entry.getValue());
+ }
+ }
+ return flat;
+ }
+ }
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
index b8311a6..84993f2 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
@@ -43,8 +43,8 @@ public class KafkaAdminClient implements StreamAdminClient
public KafkaAdminClient(IntegrationTestingConfig config)
{
Properties properties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getKafkaHost());
+ KafkaUtil.addPropertiesFromTestConfig(config, properties);
adminClient = AdminClient.create(properties);
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index 1d5973c..79ae219 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -42,7 +42,6 @@ public class KafkaEventWriter implements StreamEventWriter
public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
{
Properties properties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, properties);
properties.setProperty("bootstrap.servers", config.getKafkaHost());
properties.setProperty("acks", "all");
properties.setProperty("retries", "3");
@@ -53,6 +52,7 @@ public class KafkaEventWriter implements StreamEventWriter
properties.setProperty("enable.idempotence", "true");
properties.setProperty("transactional.id", IdUtils.getRandomId());
}
+ KafkaUtil.addPropertiesFromTestConfig(config, properties);
this.producer = new KafkaProducer<>(
properties,
new StringSerializer(),
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
index 36534c2..0f7e9fa 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaUtil.java
@@ -21,12 +21,16 @@ package org.apache.druid.testing.utils;
import org.apache.druid.testing.IntegrationTestingConfig;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaUtil
{
private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
+ private static final String TEST_CONFIG_PROPERTY_PREFIX =
"kafka.test.config.";
+
+ public static final String TEST_CONFIG_TRANSACTION_ENABLED =
"transactionEnabled";
public static void addPropertiesFromTestConfig(IntegrationTestingConfig
config, Properties properties)
{
@@ -36,4 +40,15 @@ public class KafkaUtil
}
}
}
+
+ public static Map<String, String>
getAdditionalKafkaTestConfigFromProperties(IntegrationTestingConfig config)
+ {
+ Map<String, String> theMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+ if (entry.getKey().startsWith(TEST_CONFIG_PROPERTY_PREFIX)) {
+
theMap.put(entry.getKey().substring(TEST_CONFIG_PROPERTY_PREFIX.length()),
entry.getValue());
+ }
+ }
+ return theMap;
+ }
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index cc2a22f..204b6ef 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -58,9 +58,9 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
{
final Map<String, Object> consumerConfigs =
KafkaConsumerConfigs.getConsumerProperties();
final Properties consumerProperties = new Properties();
- KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
consumerProperties.putAll(consumerConfigs);
consumerProperties.setProperty("bootstrap.servers",
config.getKafkaInternalHost());
+ KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
return spec -> {
try {
spec = StringUtils.replace(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
index 9143d9b..2286d23 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
@@ -22,7 +22,9 @@ package org.apache.druid.tests.parallelized;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
@@ -78,6 +80,9 @@ public class ITKafkaIndexingServiceDataFormatTest extends
AbstractKafkaIndexingS
@Inject
private @Json ObjectMapper jsonMapper;
+ @Inject
+ private IntegrationTestingConfig config;
+
@BeforeClass
public void beforeClass() throws Exception
{
@@ -88,7 +93,15 @@ public class ITKafkaIndexingServiceDataFormatTest extends
AbstractKafkaIndexingS
public void testIndexData(boolean transactionEnabled, String serializerPath,
String parserType, String specPath)
throws Exception
{
- doTestIndexDataStableState(transactionEnabled, serializerPath, parserType,
specPath);
+ Map<String, String> testConfig =
KafkaUtil.getAdditionalKafkaTestConfigFromProperties(config);
+ boolean txnEnable = Boolean.parseBoolean(
+ testConfig.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED,
"false")
+ );
+ if (txnEnable != transactionEnabled) {
+ // do nothing
+ return;
+ }
+ doTestIndexDataStableState(txnEnable, serializerPath, parserType,
specPath);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]