This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 597b576  Removed Thread.seep(...) in unit tests in order to avoid 
possible sources of flakiness fix #186
     new 1a65e00  Merge pull request #204 from valdar/flakeyTests
597b576 is described below

commit 597b576d486c864816ce3d9d24be1fd28b5493ec
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Sun May 10 22:39:04 2020 +0200

    Removed Thread.seep(...) in unit tests in order to avoid possible sources 
of flakiness fix #186
---
 .../camel/kafkaconnector/CamelSourceTask.java      |   2 +
 .../kafkaconnector/utils/CamelMainSupport.java     |   2 +-
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 178 +++++++++------------
 core/testBurnInRun.sh                              |  15 ++
 4 files changed, 96 insertions(+), 101 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index ceecf94..c9123ed 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -154,6 +154,8 @@ public class CamelSourceTask extends SourceTask {
         } else {
             return records;
         }
+
+
     }
 
     @Override
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index d84a41e..6d47e10 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -74,7 +74,7 @@ public class CamelMainSupport {
         camelMain.addMainListener(new CamelMainFinishedListener());
 
         // reordering properties to place the one starting with "#class:" first
-        Map<String, String> orderedProps = new LinkedHashMap<>();
+        LinkedHashMap<String, String> orderedProps = new LinkedHashMap<>();
         props.keySet().stream()
                 .filter(k -> props.get(k).startsWith("#class:"))
                 .forEach(k -> orderedProps.put(k, props.get(k)));
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 2934a3b..9c40d40 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -17,14 +17,12 @@
 package org.apache.camel.kafkaconnector;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import org.apache.camel.ProducerTemplate;
 import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.header.Header;
-import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
@@ -34,37 +32,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSourceTaskTest {
 
-    private static final String TIMER_URI = 
"timer:kafkaconnector?period=10&fixedRate=true&delay=0";
-
     @Test
-    public void testSourcePolling() throws InterruptedException {
+    public void testSourcePolling() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", TIMER_URI);
+        props.put("camel.source.url", "direct:start");
         props.put("topics", "mytopic");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(11L);
-        List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(2, poll.size());
+        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
+        template.sendBody("direct:start", "awesome!");
+
+        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
+        assertEquals(1, poll.size());
         assertEquals("mytopic", poll.get(0).topic());
-        Headers headers = poll.get(0).headers();
-        boolean containsHeader = false;
-        for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
-            Header header = (Header)iterator.next();
-            if 
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
-                containsHeader = true;
-                break;
-            }
-        }
 
         camelSourceTask.stop();
-        assertTrue(containsHeader);
     }
 
     @Test
-    public void testSourcePollingWithKey() throws InterruptedException {
+    public void testSourcePollingWithKey() {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "direct:start");
         props.put("topics", "mytopic");
@@ -78,7 +66,7 @@ public class CamelSourceTaskTest {
         // first we test if we have a key in the message with body
         template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", 1234);
 
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 3);
+        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
         assertEquals(1, poll.size());
         assertEquals(1234, poll.get(0).key());
         assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
@@ -86,7 +74,7 @@ public class CamelSourceTaskTest {
         // second we test if we have no key under the header
         template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 
1234);
 
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 3);
+        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
         assertEquals(1, poll.size());
         assertNull(poll.get(0).key());
         assertNull(poll.get(0).keySchema());
@@ -94,7 +82,7 @@ public class CamelSourceTaskTest {
         // third we test if we have the header but with null value
         template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", null);
 
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 3);
+        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
         assertEquals(1, poll.size());
         assertNull(poll.get(0).key());
         assertNull(poll.get(0).keySchema());
@@ -103,7 +91,7 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testSourcePollingWithBody() throws InterruptedException {
+    public void testSourcePollingWithBody() {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "direct:start");
         props.put("topics", "mytopic");
@@ -156,41 +144,53 @@ public class CamelSourceTaskTest {
     }
 
     @Test
-    public void testSourcePollingTimeout() throws InterruptedException {
+    public void testSourcePollingTimeout() {
+        final int nuberOfMessagesSent = 999;
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", TIMER_URI);
+        props.put("camel.source.url", "direct:start");
         props.put("topics", "mytopic");
         props.put("camel.source.maxPollDuration", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 3);
-        camelSourceTask.stop();
+        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
 
-        assertTrue(poll.size() >= 0 &&  poll.size() <= 1, "Received messages 
are: " + poll.size() + ", expected between 0 and 1.");
-    }
+        // first we send nuberOfMessagesSent of messages
+        Stream.of(nuberOfMessagesSent).forEach(i -> 
template.sendBody("direct:start", "awesome!"));
 
+        // then we assert we received only a fraction of them (proving that 
polling timeout of 1 Millisecond is working)
+        List<SourceRecord> poll = camelSourceTask.poll();
+        assertTrue(poll.size() < nuberOfMessagesSent, "Expected received 
messages count to be strictly less than " + nuberOfMessagesSent + ", got " + 
poll.size());
 
+        camelSourceTask.stop();
+    }
 
     @Test
-    public void testSourcePollingMaxRecordNumber() throws InterruptedException 
{
+    public void testSourcePollingMaxRecordNumber() {
+        final int nuberOfMessagesSent = 2;
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", TIMER_URI);
+        props.put("camel.source.url", "direct:start");
         props.put("topics", "mytopic");
         props.put("camel.source.maxBatchPollSize", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        List<SourceRecord> poll = camelSourceTask.poll();
-        camelSourceTask.stop();
+        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
+
+        // first we send nuberOfMessagesSent of messages > 
camel.source.maxBatchPollSize
+        Stream.of(nuberOfMessagesSent).forEach(i -> 
template.sendBody("direct:start", "awesome!"));
+
+        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
 
-        assertTrue(poll.size() >= 0 &&  poll.size() <= 1, "Received messages 
are: " + poll.size() + ", expected between 0 and 1.");
+        // then we assert we received just camel.source.maxBatchPollSize
+        assertEquals(1, poll.size());
+        camelSourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingConsumerOptions() {
+    public void testSourceConsumerOptions() {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.url", "timer:kafkaconnector");
         props.put("topics", "mytopic");
@@ -205,75 +205,68 @@ public class CamelSourceTaskTest {
 
         camelSourceTask.getCms().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("direct"))
-                .forEach(e -> 
assertEquals("direct://end?pollingConsumerBlockTimeout=1000&pollingConsumerBlockWhenFull=false&pollingConsumerQueueSize=10",
 e.getEndpointUri()));
+                .forEach(e -> {
+                    assertTrue(e.getEndpointUri().contains("end"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
+                });
 
         camelSourceTask.stop();
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() throws 
InterruptedException {
+    public void testSourceUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", TIMER_URI);
+        props.put("camel.source.url", 
"timer:kafkaconnector?period=10&fixedRate=true&delay=0");
         props.put("topics", "mytopic");
+        //these properties should be ignored
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"shouldNotBeUsed");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"endpointProperty", "shouldNotBeUsed");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"pathChunk", "shouldNotBeUsed");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"delay", "100000000");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", 
"shouldNotBeUsed");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(11L);
-        List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(2, poll.size());
-        assertEquals("mytopic", poll.get(0).topic());
-        Headers headers = poll.get(0).headers();
-        boolean containsHeader = false;
-        for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
-            Header header = (Header)iterator.next();
-            if 
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
-                containsHeader = true;
-                break;
-            }
-        }
-        camelSourceTask.stop();
+        assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+
+        camelSourceTask.getCms().getEndpoints().stream()
+                .filter(e -> e.getEndpointUri().startsWith("timer"))
+                .forEach(e -> {
+                    assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+                    assertTrue(e.getEndpointUri().contains("period=10"));
+                    assertTrue(e.getEndpointUri().contains("fixedRate=true"));
+                    assertTrue(e.getEndpointUri().contains("delay=0"));
+                });
 
-        assertTrue(containsHeader);
+        camelSourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingUsingComponentProperty() throws 
InterruptedException {
+    public void testSourceUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
         props.put("topics", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "1000");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "10000");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"delay", "0");
         props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "kafkaconnector");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
-        List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(2, poll.size());
-        assertEquals("mytopic", poll.get(0).topic());
-        Headers headers = poll.get(0).headers();
-        boolean containsHeader = false;
-        for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
-            Header header = (Header)iterator.next();
-            if 
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
-                containsHeader = true;
-                break;
-            }
-        }
-        assertTrue(containsHeader);
-
-        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
-            .filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+        camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> e.getEndpointUri().startsWith("timer"))
+            .forEach(e -> {
+                assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+                assertTrue(e.getEndpointUri().contains("period=1000"));
+                assertTrue(e.getEndpointUri().contains("delay=0"));
+            });
 
         camelSourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingUsingMultipleComponentProperties() throws 
InterruptedException {
+    public void testSourceUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
         props.put("topics", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
@@ -284,39 +277,24 @@ public class CamelSourceTaskTest {
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
-        List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(2, poll.size());
-        assertEquals("mytopic", poll.get(0).topic());
-        Headers headers = poll.get(0).headers();
-        boolean containsHeader = false;
-        for (Iterator iterator = headers.iterator(); iterator.hasNext();) {
-            Header header = (Header)iterator.next();
-            if 
(header.key().equalsIgnoreCase("CamelPropertyCamelTimerPeriod")) {
-                containsHeader = true;
-                break;
-            }
-        }
-        assertTrue(containsHeader);
-
-        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
-            .filter(e -> 
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+        camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> e.getEndpointUri().startsWith("timer"))
+            .forEach(e -> {
+                assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+                assertTrue(e.getEndpointUri().contains("period=1000"));
+                assertTrue(e.getEndpointUri().contains("repeatCount=0"));
+            });
 
         camelSourceTask.stop();
     }
 
-    private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask 
camelSourceTask, int retries) throws InterruptedException {
-        return camelSourceTaskPollWithRetries(camelSourceTask, retries, 0L);
-    }
-
-    private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask 
camelSourceTask, int retries, long sleepBetweenRetires) throws 
InterruptedException {
+    private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask 
camelSourceTask, int retries) {
         List<SourceRecord> poll;
         do {
             poll = camelSourceTask.poll();
             if (poll == null) {
                 retries--;
             }
-            Thread.sleep(sleepBetweenRetires);
         } while (poll == null && retries > 0);
         return poll;
     }
diff --git a/core/testBurnInRun.sh b/core/testBurnInRun.sh
new file mode 100755
index 0000000..5c6b5d4
--- /dev/null
+++ b/core/testBurnInRun.sh
@@ -0,0 +1,15 @@
+#!/bin/sh
+# Run this script from the project root directory.
+# It will run core module tests in burn in mode (i.e. untill they fail or this 
scritp is manually terminated)
+for (( i = 1; ; i++ ))
+do
+  echo "Attempt $i"
+  ./mvnw test -pl core -am
+  exitcode=$?
+  if [ $exitcode -ne 0 ]
+  then
+    echo "Error at attempt $i"
+    exit
+  fi
+done
+

Reply via email to