This is an automated email from the ASF dual-hosted git repository. sorabh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 6ff854b79f4139237d20bd9a4fbd256f8ce4c0f2 Author: Abhishek Ravi <[email protected]> AuthorDate: Mon Sep 10 23:07:12 2018 -0700 DRILL-6625: Intermittent failures in Kafka unit tests Unit test changes to fix intermittent kafka producer and consumer errors. - Increase the value of REQUEST_TIMEOUT_MS_CONFIG to accomadate slower systems. - Increase the value of producer RETRIES_CONFIG to 3 (from 0). - Prevent producer to send duplicate messages due to retries by enabling Idempotent producer. - Increase consumer poll timeout (from 200 ms). - The design of `TestKafkaSuit` is very similar to design of `MongoTestSuit` and hence would require changes similar to the ones made in [storage-mongo/pom.xml](https://github.com/apache/drill/pull/923/commits/f5dfa56f33a46b92e2f9de153d82a16a77642ddf#diff-e110e2cbfd77d27e85d5121529c612bfR83). - Current behavior is surefire runs test classes twice - once as a part of `TestKafkaSuit` and the other by directly running classes. To prevent the latter from happening, changes were made in `pom.xml` for `storage-mongo` plugin. closes #1463 --- contrib/storage-kafka/pom.xml | 26 ++++++++++++++++++++++ .../exec/store/kafka/KafkaFilterPushdownTest.java | 5 +++++ .../exec/store/kafka/KafkaMessageGenerator.java | 5 +++-- .../drill/exec/store/kafka/KafkaTestBase.java | 2 +- .../drill/exec/store/kafka/TestKafkaSuit.java | 1 + 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml index 5b5917d..0260c1c 100644 --- a/contrib/storage-kafka/pom.xml +++ b/contrib/storage-kafka/pom.xml @@ -32,6 +32,7 @@ <properties> <kafka.version>0.11.0.1</kafka.version> + <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite> </properties> <dependencies> @@ -97,4 +98,29 @@ <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <includes> + <include>${kafka.TestSuite}</include> + </includes> + <excludes> + <exclude>**/KafkaFilterPushdownTest.java</exclude> + <exclude>**/KafkaQueriesTest.java</exclude> + <exclude>**/MessageIteratorTest.java</exclude> + </excludes> + <systemProperties> + <property> + <name>logback.log.dir</name> + <value>${project.build.directory}/surefire-reports</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index 7be0ec3..d874733 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster; +import static org.junit.Assert.assertTrue; @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaFilterPushdownTest extends KafkaTestBase { @@ -42,6 +43,10 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG); + String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC); + //Ensure messages are present + assertTrue("Kafka server does not have expected number of messages", + testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG); } /** diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java index f4a254e..d094531 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java @@ -55,14 +55,15 @@ public class KafkaMessageGenerator { public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) { producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); producerProperties.put(ProducerConfig.ACKS_CONFIG, "all"); - producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3); producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0); producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); - producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client"); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //So that retries do not cause duplicates } public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException { diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java index 9f06606..b1742d7 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java @@ -57,7 +57,7 @@ public class KafkaTestBase extends PlanTestBase { pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true); testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER, "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader")); - testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200)); + testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 5000)); } public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception { diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java index ecf998e..784eb4e 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -112,6 +112,7 @@ public class TestKafkaSuit { Properties topicProps = new Properties(); topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
