This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 597b576 Removed Thread.seep(...) in unit tests in order to avoid
possible sources of flakiness fix #186
new 1a65e00 Merge pull request #204 from valdar/flakeyTests
597b576 is described below
commit 597b576d486c864816ce3d9d24be1fd28b5493ec
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Sun May 10 22:39:04 2020 +0200
Removed Thread.seep(...) in unit tests in order to avoid possible sources
of flakiness fix #186
---
.../camel/kafkaconnector/CamelSourceTask.java | 2 +
.../kafkaconnector/utils/CamelMainSupport.java | 2 +-
.../camel/kafkaconnector/CamelSourceTaskTest.java | 178 +++++++++------------
core/testBurnInRun.sh | 15 ++
4 files changed, 96 insertions(+), 101 deletions(-)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index ceecf94..c9123ed 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -154,6 +154,8 @@ public class CamelSourceTask extends SourceTask {
} else {
return records;
}
+
+
}
@Override
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index d84a41e..6d47e10 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -74,7 +74,7 @@ public class CamelMainSupport {
camelMain.addMainListener(new CamelMainFinishedListener());
// reordering properties to place the one starting with "#class:" first
- Map<String, String> orderedProps = new LinkedHashMap<>();
+ LinkedHashMap<String, String> orderedProps = new LinkedHashMap<>();
props.keySet().stream()
.filter(k -> props.get(k).startsWith("#class:"))
.forEach(k -> orderedProps.put(k, props.get(k)));
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 2934a3b..9c40d40 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -17,14 +17,12 @@
package org.apache.camel.kafkaconnector;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.header.Header;
-import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
@@ -34,37 +32,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSourceTaskTest {
- private static final String TIMER_URI =
"timer:kafkaconnector?period=10&fixedRate=true&delay=0";
-
@Test
- public void testSourcePolling() throws InterruptedException {
+ public void testSourcePolling() {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", TIMER_URI);
+ props.put("camel.source.url", "direct:start");
props.put("topics", "mytopic");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(11L);
- List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(2, poll.size());
+ final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
+ template.sendBody("direct:start", "awesome!");
+
+ List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
+ assertEquals(1, poll.size());
assertEquals("mytopic", poll.get(0).topic());
- Headers headers = poll.get(0).headers();
- boolean containsHeader = false;
- for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
- Header header = (Header)iterator.next();
- if
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
- containsHeader = true;
- break;
- }
- }
camelSourceTask.stop();
- assertTrue(containsHeader);
}
@Test
- public void testSourcePollingWithKey() throws InterruptedException {
+ public void testSourcePollingWithKey() {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "direct:start");
props.put("topics", "mytopic");
@@ -78,7 +66,7 @@ public class CamelSourceTaskTest {
// first we test if we have a key in the message with body
template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", 1234);
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 3);
+ List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
assertEquals(1, poll.size());
assertEquals(1234, poll.get(0).key());
assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
@@ -86,7 +74,7 @@ public class CamelSourceTaskTest {
// second we test if we have no key under the header
template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader",
1234);
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 3);
+ poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
assertEquals(1, poll.size());
assertNull(poll.get(0).key());
assertNull(poll.get(0).keySchema());
@@ -94,7 +82,7 @@ public class CamelSourceTaskTest {
// third we test if we have the header but with null value
template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", null);
- poll = camelSourceTaskPollWithRetries(camelSourceTask, 3);
+ poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
assertEquals(1, poll.size());
assertNull(poll.get(0).key());
assertNull(poll.get(0).keySchema());
@@ -103,7 +91,7 @@ public class CamelSourceTaskTest {
}
@Test
- public void testSourcePollingWithBody() throws InterruptedException {
+ public void testSourcePollingWithBody() {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "direct:start");
props.put("topics", "mytopic");
@@ -156,41 +144,53 @@ public class CamelSourceTaskTest {
}
@Test
- public void testSourcePollingTimeout() throws InterruptedException {
+ public void testSourcePollingTimeout() {
+ final int nuberOfMessagesSent = 999;
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", TIMER_URI);
+ props.put("camel.source.url", "direct:start");
props.put("topics", "mytopic");
props.put("camel.source.maxPollDuration", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 3);
- camelSourceTask.stop();
+ final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
- assertTrue(poll.size() >= 0 && poll.size() <= 1, "Received messages
are: " + poll.size() + ", expected between 0 and 1.");
- }
+ // first we send nuberOfMessagesSent of messages
+ Stream.of(nuberOfMessagesSent).forEach(i ->
template.sendBody("direct:start", "awesome!"));
+ // then we assert we received only a fraction of them (proving that
polling timeout of 1 Millisecond is working)
+ List<SourceRecord> poll = camelSourceTask.poll();
+ assertTrue(poll.size() < nuberOfMessagesSent, "Expected received
messages count to be strictly less than " + nuberOfMessagesSent + ", got " +
poll.size());
+ camelSourceTask.stop();
+ }
@Test
- public void testSourcePollingMaxRecordNumber() throws InterruptedException
{
+ public void testSourcePollingMaxRecordNumber() {
+ final int nuberOfMessagesSent = 2;
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", TIMER_URI);
+ props.put("camel.source.url", "direct:start");
props.put("topics", "mytopic");
props.put("camel.source.maxBatchPollSize", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- List<SourceRecord> poll = camelSourceTask.poll();
- camelSourceTask.stop();
+ final ProducerTemplate template =
camelSourceTask.getCms().createProducerTemplate();
+
+ // first we send nuberOfMessagesSent of messages >
camel.source.maxBatchPollSize
+ Stream.of(nuberOfMessagesSent).forEach(i ->
template.sendBody("direct:start", "awesome!"));
+
+ List<SourceRecord> poll =
camelSourceTaskPollWithRetries(camelSourceTask, 5);
- assertTrue(poll.size() >= 0 && poll.size() <= 1, "Received messages
are: " + poll.size() + ", expected between 0 and 1.");
+ // then we assert we received just camel.source.maxBatchPollSize
+ assertEquals(1, poll.size());
+ camelSourceTask.stop();
}
@Test
- public void testSourcePollingConsumerOptions() {
+ public void testSourceConsumerOptions() {
Map<String, String> props = new HashMap<>();
props.put("camel.source.url", "timer:kafkaconnector");
props.put("topics", "mytopic");
@@ -205,75 +205,68 @@ public class CamelSourceTaskTest {
camelSourceTask.getCms().getEndpoints().stream()
.filter(e -> e.getEndpointUri().startsWith("direct"))
- .forEach(e ->
assertEquals("direct://end?pollingConsumerBlockTimeout=1000&pollingConsumerBlockWhenFull=false&pollingConsumerQueueSize=10",
e.getEndpointUri()));
+ .forEach(e -> {
+ assertTrue(e.getEndpointUri().contains("end"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
+
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
+ });
camelSourceTask.stop();
}
@Test
- public void testUrlPrecedenceOnComponentProperty() throws
InterruptedException {
+ public void testSourceUrlPrecedenceOnComponentProperty() {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", TIMER_URI);
+ props.put("camel.source.url",
"timer:kafkaconnector?period=10&fixedRate=true&delay=0");
props.put("topics", "mytopic");
+ //these properties should be ignored
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"shouldNotBeUsed");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"endpointProperty", "shouldNotBeUsed");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"pathChunk", "shouldNotBeUsed");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"delay", "100000000");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name",
"shouldNotBeUsed");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(11L);
- List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(2, poll.size());
- assertEquals("mytopic", poll.get(0).topic());
- Headers headers = poll.get(0).headers();
- boolean containsHeader = false;
- for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
- Header header = (Header)iterator.next();
- if
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
- containsHeader = true;
- break;
- }
- }
- camelSourceTask.stop();
+ assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+
+ camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().startsWith("timer"))
+ .forEach(e -> {
+ assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+ assertTrue(e.getEndpointUri().contains("period=10"));
+ assertTrue(e.getEndpointUri().contains("fixedRate=true"));
+ assertTrue(e.getEndpointUri().contains("delay=0"));
+ });
- assertTrue(containsHeader);
+ camelSourceTask.stop();
}
@Test
- public void testSourcePollingUsingComponentProperty() throws
InterruptedException {
+ public void testSourceUsingComponentProperty() {
Map<String, String> props = new HashMap<>();
props.put("topics", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
- props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "1000");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"period", "10000");
+ props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"delay", "0");
props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() +
"timerName", "kafkaconnector");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
- List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(2, poll.size());
- assertEquals("mytopic", poll.get(0).topic());
- Headers headers = poll.get(0).headers();
- boolean containsHeader = false;
- for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
- Header header = (Header)iterator.next();
- if
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
- containsHeader = true;
- break;
- }
- }
- assertTrue(containsHeader);
-
- assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
- .filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+ camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().startsWith("timer"))
+ .forEach(e -> {
+ assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+ assertTrue(e.getEndpointUri().contains("period=1000"));
+ assertTrue(e.getEndpointUri().contains("delay=0"));
+ });
camelSourceTask.stop();
}
@Test
- public void testSourcePollingUsingMultipleComponentProperties() throws
InterruptedException {
+ public void testSourceUsingMultipleComponentProperties() {
Map<String, String> props = new HashMap<>();
props.put("topics", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
@@ -284,39 +277,24 @@ public class CamelSourceTaskTest {
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
- List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(2, poll.size());
- assertEquals("mytopic", poll.get(0).topic());
- Headers headers = poll.get(0).headers();
- boolean containsHeader = false;
- for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
- Header header = (Header)iterator.next();
- if
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
- containsHeader = true;
- break;
- }
- }
- assertTrue(containsHeader);
-
- assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
- .filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+ camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().startsWith("timer"))
+ .forEach(e -> {
+ assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+ assertTrue(e.getEndpointUri().contains("period=1000"));
+ assertTrue(e.getEndpointUri().contains("repeatCount=0"));
+ });
camelSourceTask.stop();
}
- private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask
camelSourceTask, int retries) throws InterruptedException {
- return camelSourceTaskPollWithRetries(camelSourceTask, retries, 0L);
- }
-
- private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask
camelSourceTask, int retries, long sleepBetweenRetires) throws
InterruptedException {
+ private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask
camelSourceTask, int retries) {
List<SourceRecord> poll;
do {
poll = camelSourceTask.poll();
if (poll == null) {
retries--;
}
- Thread.sleep(sleepBetweenRetires);
} while (poll == null && retries > 0);
return poll;
}
diff --git a/core/testBurnInRun.sh b/core/testBurnInRun.sh
new file mode 100755
index 0000000..5c6b5d4
--- /dev/null
+++ b/core/testBurnInRun.sh
@@ -0,0 +1,15 @@
+#!/bin/sh
+# Run this script from the project root directory.
+# It will run core module tests in burn in mode (i.e. untill they fail or this
scritp is manually terminated)
+for (( i = 1; ; i++ ))
+do
+ echo "Attempt $i"
+ ./mvnw test -pl core -am
+ exitcode=$?
+ if [ $exitcode -ne 0 ]
+ then
+ echo "Error at attempt $i"
+ exit
+ fi
+done
+