This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 597b576 Removed Thread.seep(...) in unit tests in order to avoid possible sources of flakiness fix #186 new 1a65e00 Merge pull request #204 from valdar/flakeyTests 597b576 is described below commit 597b576d486c864816ce3d9d24be1fd28b5493ec Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Sun May 10 22:39:04 2020 +0200 Removed Thread.seep(...) in unit tests in order to avoid possible sources of flakiness fix #186 --- .../camel/kafkaconnector/CamelSourceTask.java | 2 + .../kafkaconnector/utils/CamelMainSupport.java | 2 +- .../camel/kafkaconnector/CamelSourceTaskTest.java | 178 +++++++++------------ core/testBurnInRun.sh | 15 ++ 4 files changed, 96 insertions(+), 101 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index ceecf94..c9123ed 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -154,6 +154,8 @@ public class CamelSourceTask extends SourceTask { } else { return records; } + + } @Override diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java index d84a41e..6d47e10 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java @@ -74,7 +74,7 @@ public class CamelMainSupport { camelMain.addMainListener(new CamelMainFinishedListener()); // reordering properties to place the one starting with "#class:" first - Map<String, String> orderedProps = new LinkedHashMap<>(); + LinkedHashMap<String, String> orderedProps = new LinkedHashMap<>(); props.keySet().stream() .filter(k -> props.get(k).startsWith("#class:")) .forEach(k -> orderedProps.put(k, props.get(k))); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 2934a3b..9c40d40 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -17,14 +17,12 @@ package org.apache.camel.kafkaconnector; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.camel.ProducerTemplate; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.header.Header; -import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; @@ -34,37 +32,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class CamelSourceTaskTest { - private static final String TIMER_URI = "timer:kafkaconnector?period=10&fixedRate=true&delay=0"; - @Test - public void testSourcePolling() throws InterruptedException { + public void testSourcePolling() { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", TIMER_URI); + props.put("camel.source.url", "direct:start"); props.put("topics", "mytopic"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(11L); - List<SourceRecord> poll = camelSourceTask.poll(); - assertEquals(2, poll.size()); + final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + template.sendBody("direct:start", "awesome!"); + + List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); + assertEquals(1, poll.size()); assertEquals("mytopic", poll.get(0).topic()); - Headers headers = poll.get(0).headers(); - boolean containsHeader = false; - for (Iterator iterator = headers.iterator(); iterator.hasNext();) { - Header header = (Header)iterator.next(); - if (header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) { - containsHeader = true; - break; - } - } camelSourceTask.stop(); - assertTrue(containsHeader); } @Test - public void testSourcePollingWithKey() throws InterruptedException { + public void testSourcePollingWithKey() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct:start"); props.put("topics", "mytopic"); @@ -78,7 +66,7 @@ public class CamelSourceTaskTest { // first we test if we have a key in the message with body template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 3); + List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); assertEquals(1, poll.size()); assertEquals(1234, poll.get(0).key()); assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type()); @@ -86,7 +74,7 @@ public class CamelSourceTaskTest { // second we test if we have no key under the header template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234); - poll = camelSourceTaskPollWithRetries(camelSourceTask, 3); + poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); assertEquals(1, poll.size()); assertNull(poll.get(0).key()); assertNull(poll.get(0).keySchema()); @@ -94,7 +82,7 @@ public class CamelSourceTaskTest { // third we test if we have the header but with null value template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null); - poll = camelSourceTaskPollWithRetries(camelSourceTask, 3); + poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); assertEquals(1, poll.size()); assertNull(poll.get(0).key()); assertNull(poll.get(0).keySchema()); @@ -103,7 +91,7 @@ public class CamelSourceTaskTest { } @Test - public void testSourcePollingWithBody() throws InterruptedException { + public void testSourcePollingWithBody() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "direct:start"); props.put("topics", "mytopic"); @@ -156,41 +144,53 @@ public class CamelSourceTaskTest { } @Test - public void testSourcePollingTimeout() throws InterruptedException { + public void testSourcePollingTimeout() { + final int nuberOfMessagesSent = 999; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", TIMER_URI); + props.put("camel.source.url", "direct:start"); props.put("topics", "mytopic"); props.put("camel.source.maxPollDuration", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 3); - camelSourceTask.stop(); + final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - assertTrue(poll.size() >= 0 && poll.size() <= 1, "Received messages are: " + poll.size() + ", expected between 0 and 1."); - } + // first we send nuberOfMessagesSent of messages + Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); + // then we assert we received only a fraction of them (proving that polling timeout of 1 Millisecond is working) + List<SourceRecord> poll = camelSourceTask.poll(); + assertTrue(poll.size() < nuberOfMessagesSent, "Expected received messages count to be strictly less than " + nuberOfMessagesSent + ", got " + poll.size()); + camelSourceTask.stop(); + } @Test - public void testSourcePollingMaxRecordNumber() throws InterruptedException { + public void testSourcePollingMaxRecordNumber() { + final int nuberOfMessagesSent = 2; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", TIMER_URI); + props.put("camel.source.url", "direct:start"); props.put("topics", "mytopic"); props.put("camel.source.maxBatchPollSize", "1"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - List<SourceRecord> poll = camelSourceTask.poll(); - camelSourceTask.stop(); + final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + + // first we send nuberOfMessagesSent of messages > camel.source.maxBatchPollSize + Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); + + List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertTrue(poll.size() >= 0 && poll.size() <= 1, "Received messages are: " + poll.size() + ", expected between 0 and 1."); + // then we assert we received just camel.source.maxBatchPollSize + assertEquals(1, poll.size()); + camelSourceTask.stop(); } @Test - public void testSourcePollingConsumerOptions() { + public void testSourceConsumerOptions() { Map<String, String> props = new HashMap<>(); props.put("camel.source.url", "timer:kafkaconnector"); props.put("topics", "mytopic"); @@ -205,75 +205,68 @@ public class CamelSourceTaskTest { camelSourceTask.getCms().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("direct")) - .forEach(e -> assertEquals("direct://end?pollingConsumerBlockTimeout=1000&pollingConsumerBlockWhenFull=false&pollingConsumerQueueSize=10", e.getEndpointUri())); + .forEach(e -> { + assertTrue(e.getEndpointUri().contains("end")); + assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000")); + assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false")); + assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10")); + }); camelSourceTask.stop(); } @Test - public void testUrlPrecedenceOnComponentProperty() throws InterruptedException { + public void testSourceUrlPrecedenceOnComponentProperty() { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", TIMER_URI); + props.put("camel.source.url", "timer:kafkaconnector?period=10&fixedRate=true&delay=0"); props.put("topics", "mytopic"); + //these properties should be ignored props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed"); - props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "shouldNotBeUsed"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "delay", "100000000"); + props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "shouldNotBeUsed"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(11L); - List<SourceRecord> poll = camelSourceTask.poll(); - assertEquals(2, poll.size()); - assertEquals("mytopic", poll.get(0).topic()); - Headers headers = poll.get(0).headers(); - boolean containsHeader = false; - for (Iterator iterator = headers.iterator(); iterator.hasNext();) { - Header header = (Header)iterator.next(); - if (header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) { - containsHeader = true; - break; - } - } - camelSourceTask.stop(); + assertEquals(2, camelSourceTask.getCms().getEndpoints().size()); + + camelSourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().startsWith("timer")) + .forEach(e -> { + assertTrue(e.getEndpointUri().contains("kafkaconnector")); + assertTrue(e.getEndpointUri().contains("period=10")); + assertTrue(e.getEndpointUri().contains("fixedRate=true")); + assertTrue(e.getEndpointUri().contains("delay=0")); + }); - assertTrue(containsHeader); + camelSourceTask.stop(); } @Test - public void testSourcePollingUsingComponentProperty() throws InterruptedException { + public void testSourceUsingComponentProperty() { Map<String, String> props = new HashMap<>(); props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "10000"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "delay", "0"); props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector"); CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(2100L); - List<SourceRecord> poll = camelSourceTask.poll(); - assertEquals(2, poll.size()); - assertEquals("mytopic", poll.get(0).topic()); - Headers headers = poll.get(0).headers(); - boolean containsHeader = false; - for (Iterator iterator = headers.iterator(); iterator.hasNext();) { - Header header = (Header)iterator.next(); - if (header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) { - containsHeader = true; - break; - } - } - assertTrue(containsHeader); - - assertEquals(1, camelSourceTask.getCms().getEndpoints().stream() - .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count()); + camelSourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().startsWith("timer")) + .forEach(e -> { + assertTrue(e.getEndpointUri().contains("kafkaconnector")); + assertTrue(e.getEndpointUri().contains("period=1000")); + assertTrue(e.getEndpointUri().contains("delay=0")); + }); camelSourceTask.stop(); } @Test - public void testSourcePollingUsingMultipleComponentProperties() throws InterruptedException { + public void testSourceUsingMultipleComponentProperties() { Map<String, String> props = new HashMap<>(); props.put("topics", "mytopic"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); @@ -284,39 +277,24 @@ public class CamelSourceTaskTest { CamelSourceTask camelSourceTask = new CamelSourceTask(); camelSourceTask.start(props); - Thread.sleep(2100L); - List<SourceRecord> poll = camelSourceTask.poll(); - assertEquals(2, poll.size()); - assertEquals("mytopic", poll.get(0).topic()); - Headers headers = poll.get(0).headers(); - boolean containsHeader = false; - for (Iterator iterator = headers.iterator(); iterator.hasNext();) { - Header header = (Header)iterator.next(); - if (header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) { - containsHeader = true; - break; - } - } - assertTrue(containsHeader); - - assertEquals(1, camelSourceTask.getCms().getEndpoints().stream() - .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count()); + camelSourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().startsWith("timer")) + .forEach(e -> { + assertTrue(e.getEndpointUri().contains("kafkaconnector")); + assertTrue(e.getEndpointUri().contains("period=1000")); + assertTrue(e.getEndpointUri().contains("repeatCount=0")); + }); camelSourceTask.stop(); } - private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask camelSourceTask, int retries) throws InterruptedException { - return camelSourceTaskPollWithRetries(camelSourceTask, retries, 0L); - } - - private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask camelSourceTask, int retries, long sleepBetweenRetires) throws InterruptedException { + private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask camelSourceTask, int retries) { List<SourceRecord> poll; do { poll = camelSourceTask.poll(); if (poll == null) { retries--; } - Thread.sleep(sleepBetweenRetires); } while (poll == null && retries > 0); return poll; } diff --git a/core/testBurnInRun.sh b/core/testBurnInRun.sh new file mode 100755 index 0000000..5c6b5d4 --- /dev/null +++ b/core/testBurnInRun.sh @@ -0,0 +1,15 @@ +#!/bin/sh +# Run this script from the project root directory. +# It will run core module tests in burn in mode (i.e. untill they fail or this scritp is manually terminated) +for (( i = 1; ; i++ )) +do + echo "Attempt $i" + ./mvnw test -pl core -am + exitcode=$? + if [ $exitcode -ne 0 ] + then + echo "Error at attempt $i" + exit + fi +done +