This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 49aa5e8  Improved CamelSourceTaskTest
     new 5593ea7  Merge pull request #205 from fvaleri/no-flaky
49aa5e8 is described below

commit 49aa5e8cba11a49c9fa57f4dc1a3639e9a2c0e29
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sat May 9 21:06:28 2020 +0200

    Improved CamelSourceTaskTest
---
 .../camel/kafkaconnector/CamelSourceTaskTest.java  | 365 ++++++++++-----------
 1 file changed, 165 insertions(+), 200 deletions(-)

diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 9c40d40..33807ee 100644
--- 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.kafkaconnector;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Stream;
 
 import org.apache.camel.ProducerTemplate;
 import org.apache.kafka.connect.data.Schema;
@@ -32,270 +31,236 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CamelSourceTaskTest {
 
+    private static final String DIRECT_URI = "direct:start";
+    private static final String TOPIC_NAME = "my-topic";
+
+    private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) {
+        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+        for (int i = 0; i < size; i++) {
+            template.sendBody(DIRECT_URI, "test" + i);
+        }
+    }
+
     @Test
     public void testSourcePolling() {
+        final long size = 2;
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "direct:start");
-        props.put("topics", "mytopic");
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
 
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
-        template.sendBody("direct:start", "awesome!");
+        sendBatchOfRecords(sourceTask, size);
+        List<SourceRecord> poll = sourceTask.poll();
 
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertEquals("mytopic", poll.get(0).topic());
+        assertEquals(size, poll.size());
+        assertEquals(TOPIC_NAME, poll.get(0).topic());
 
-        camelSourceTask.stop();
+        sourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingWithKey() {
+    public void testSourcePollingMaxBatchPollSize() {
+        final long size = 2;
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "direct:start");
-        props.put("topics", "mytopic");
-        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, 
"CamelSpecialTestKey");
-
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
-
-        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
-
-        // first we test if we have a key in the message with body
-        template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", 1234);
-
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertEquals(1234, poll.get(0).key());
-        assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type());
-
-        // second we test if we have no key under the header
-        template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 
1234);
-
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
-
-        // third we test if we have the header but with null value
-        template.sendBodyAndHeader("direct:start", "awesome!", 
"CamelSpecialTestKey", null);
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, 
String.valueOf(size));
 
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        camelSourceTask.stop();
-    }
+        sendBatchOfRecords(sourceTask, size + 1);
+        List<SourceRecord> poll = sourceTask.poll();
+        int pollSize = poll.size();
 
-    @Test
-    public void testSourcePollingWithBody() {
-        Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "direct:start");
-        props.put("topics", "mytopic");
-
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
-
-        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
-
-        // send first data
-        template.sendBody("direct:start", "testing kafka connect");
-
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertEquals("testing kafka connect", poll.get(0).value());
-        assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type());
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
-
-        // send second data
-        template.sendBody("direct:start", true);
-
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertTrue((boolean)poll.get(0).value());
-        assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type());
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
-
-        // second third data
-        template.sendBody("direct:start", 1234L);
-
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertEquals(1, poll.size());
-        assertEquals(1234L, poll.get(0).value());
-        assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type());
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
-
-        // third with null data
-        template.sendBody("direct:start", null);
-
-        poll = camelSourceTaskPollWithRetries(camelSourceTask, 5);
-        assertNull(poll.get(0).key());
-        assertNull(poll.get(0).keySchema());
-        assertNull(poll.get(0).value());
-        assertNull(poll.get(0).valueSchema());
-
-        camelSourceTask.stop();
+        assertTrue(pollSize >= 0 && pollSize <= size, "Batch size: " + 
pollSize + ", expected between 0 and " + size);
+        sourceTask.stop();
     }
 
     @Test
     public void testSourcePollingTimeout() {
-        final int nuberOfMessagesSent = 999;
+        final long size = 999;
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "direct:start");
-        props.put("topics", "mytopic");
-        props.put("camel.source.maxPollDuration", "1");
-
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
-
-        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "2");
 
-        // first we send nuberOfMessagesSent of messages
-        Stream.of(nuberOfMessagesSent).forEach(i -> 
template.sendBody("direct:start", "awesome!"));
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        // then we assert we received only a fraction of them (proving that 
polling timeout of 1 Millisecond is working)
-        List<SourceRecord> poll = camelSourceTask.poll();
-        assertTrue(poll.size() < nuberOfMessagesSent, "Expected received 
messages count to be strictly less than " + nuberOfMessagesSent + ", got " + 
poll.size());
+        sendBatchOfRecords(sourceTask, size);
+        List<SourceRecord>  poll = sourceTask.poll();
+        int pollSize = poll.size();
 
-        camelSourceTask.stop();
+        assertTrue(pollSize < size, "Batch size: " + pollSize + ", expected 
strictly less than " + size);
+        sourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingMaxRecordNumber() {
-        final int nuberOfMessagesSent = 2;
+    public void testSourcePollingWithKey() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "direct:start");
-        props.put("topics", "mytopic");
-        props.put("camel.source.maxBatchPollSize", "1");
-
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, 
"CamelSpecialTestKey");
 
-        final ProducerTemplate template = 
camelSourceTask.getCms().createProducerTemplate();
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
 
-        // first we send nuberOfMessagesSent of messages > 
camel.source.maxBatchPollSize
-        Stream.of(nuberOfMessagesSent).forEach(i -> 
template.sendBody("direct:start", "awesome!"));
+        // key in the message with body
+        template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 
1234);
 
-        List<SourceRecord> poll = 
camelSourceTaskPollWithRetries(camelSourceTask, 5);
+        List<SourceRecord> poll1 = sourceTask.poll();
+        assertEquals(1, poll1.size());
+        assertEquals(1234, poll1.get(0).key());
+        assertEquals(Schema.Type.INT32, poll1.get(0).keySchema().type());
 
-        // then we assert we received just camel.source.maxBatchPollSize
-        assertEquals(1, poll.size());
-        camelSourceTask.stop();
-    }
+        // no key under the header
+        template.sendBodyAndHeader(DIRECT_URI, "test", "WrongHeader", 1234);
 
-    @Test
-    public void testSourceConsumerOptions() {
-        Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
-        props.put("topics", "mytopic");
-        props.put("camel.source.pollingConsumerQueueSize", "10");
-        props.put("camel.source.pollingConsumerBlockTimeout", "1000");
-        props.put("camel.source.pollingConsumerBlockWhenFull", "false");
+        List<SourceRecord> poll2 = sourceTask.poll();
+        assertEquals(1, poll2.size());
+        assertNull(poll2.get(0).key());
+        assertNull(poll2.get(0).keySchema());
 
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        // header with null value
+        template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 
null);
 
-        assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+        List<SourceRecord> poll3 = sourceTask.poll();
+        assertEquals(1, poll3.size());
+        assertNull(poll3.get(0).key());
+        assertNull(poll3.get(0).keySchema());
 
-        camelSourceTask.getCms().getEndpoints().stream()
-                .filter(e -> e.getEndpointUri().startsWith("direct"))
-                .forEach(e -> {
-                    assertTrue(e.getEndpointUri().contains("end"));
-                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000"));
-                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
-                    
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
-                });
+        sourceTask.stop();
+    }
 
-        camelSourceTask.stop();
+    @Test
+    public void testSourcePollingWithBody() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
DIRECT_URI);
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+        final ProducerTemplate template = 
sourceTask.getCms().createProducerTemplate();
+
+        // send String
+        template.sendBody(DIRECT_URI, "test");
+
+        List<SourceRecord> poll1 = sourceTask.poll();
+        assertEquals(1, poll1.size());
+        assertEquals("test", poll1.get(0).value());
+        assertEquals(Schema.Type.STRING, poll1.get(0).valueSchema().type());
+        assertNull(poll1.get(0).key());
+        assertNull(poll1.get(0).keySchema());
+
+        // send boolean
+        template.sendBody(DIRECT_URI, true);
+
+        List<SourceRecord> poll2 = sourceTask.poll();
+        assertEquals(1, poll2.size());
+        assertTrue((boolean)poll2.get(0).value());
+        assertEquals(Schema.Type.BOOLEAN, poll2.get(0).valueSchema().type());
+        assertNull(poll2.get(0).key());
+        assertNull(poll2.get(0).keySchema());
+
+        // send long
+        template.sendBody(DIRECT_URI, 1234L);
+
+        List<SourceRecord> poll3 = sourceTask.poll();
+        assertEquals(1, poll3.size());
+        assertEquals(1234L, poll3.get(0).value());
+        assertEquals(Schema.Type.INT64, poll3.get(0).valueSchema().type());
+        assertNull(poll3.get(0).key());
+        assertNull(poll3.get(0).keySchema());
+
+        // send null
+        template.sendBody(DIRECT_URI, null);
+
+        List<SourceRecord> poll4 = sourceTask.poll();
+        assertNull(poll4.get(0).key());
+        assertNull(poll4.get(0).keySchema());
+        assertNull(poll4.get(0).value());
+        assertNull(poll4.get(0).valueSchema());
+
+        sourceTask.stop();
     }
 
     @Test
-    public void testSourceUrlPrecedenceOnComponentProperty() {
+    public void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", 
"timer:kafkaconnector?period=10&fixedRate=true&delay=0");
-        props.put("topics", "mytopic");
-        //these properties should be ignored
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
"timer:foo?period=10&repeatCount=2");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"shouldNotBeUsed");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"delay", "100000000");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", 
"shouldNotBeUsed");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "shouldNotBeUsed");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"repeatCount", "999");
 
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        assertEquals(2, camelSourceTask.getCms().getEndpoints().size());
+        assertEquals(2, sourceTask.getCms().getEndpoints().size());
 
-        camelSourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getEndpoints().stream()
                 .filter(e -> e.getEndpointUri().startsWith("timer"))
                 .forEach(e -> {
-                    assertTrue(e.getEndpointUri().contains("kafkaconnector"));
+                    assertTrue(e.getEndpointUri().contains("foo"));
                     assertTrue(e.getEndpointUri().contains("period=10"));
-                    assertTrue(e.getEndpointUri().contains("fixedRate=true"));
-                    assertTrue(e.getEndpointUri().contains("delay=0"));
+                    assertTrue(e.getEndpointUri().contains("repeatCount=2"));
                 });
 
-        camelSourceTask.stop();
+        sourceTask.stop();
     }
 
     @Test
-    public void testSourceUsingComponentProperty() {
+    public void testSourcePollingConsumerOptions() {
         Map<String, String> props = new HashMap<>();
-        props.put("topics", "mytopic");
-        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "10000");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"delay", "0");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "kafkaconnector");
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, 
"timer:foo?period=10&repeatCount=2");
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF,
 "10");
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF,
 "10");
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF,
 "false");
 
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        camelSourceTask.getCms().getEndpoints().stream()
-            .filter(e -> e.getEndpointUri().startsWith("timer"))
-            .forEach(e -> {
-                assertTrue(e.getEndpointUri().contains("kafkaconnector"));
-                assertTrue(e.getEndpointUri().contains("period=1000"));
-                assertTrue(e.getEndpointUri().contains("delay=0"));
-            });
+        assertEquals(2, sourceTask.getCms().getEndpoints().size());
 
-        camelSourceTask.stop();
+        sourceTask.getCms().getEndpoints().stream()
+                .filter(e -> e.getEndpointUri().startsWith("direct"))
+                .forEach(e -> {
+                    assertTrue(e.getEndpointUri().contains("end"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=10"));
+                    
assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false"));
+                });
+
+        sourceTask.stop();
     }
 
     @Test
-    public void testSourceUsingMultipleComponentProperties() {
+    public void testSourceUsingComponentProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put("topics", "mytopic");
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, 
"timer");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "1000");
-        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"repeatCount", "0");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "kafkaconnector");
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, 
"2");
+        
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "10");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + 
"timerName", "foo");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"period", "10");
+        props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + 
"repeatCount", "2");
 
-        CamelSourceTask camelSourceTask = new CamelSourceTask();
-        camelSourceTask.start(props);
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
 
-        camelSourceTask.getCms().getEndpoints().stream()
+        sourceTask.getCms().getEndpoints().stream()
             .filter(e -> e.getEndpointUri().startsWith("timer"))
             .forEach(e -> {
-                assertTrue(e.getEndpointUri().contains("kafkaconnector"));
-                assertTrue(e.getEndpointUri().contains("period=1000"));
-                assertTrue(e.getEndpointUri().contains("repeatCount=0"));
+                assertTrue(e.getEndpointUri().contains("foo"));
+                assertTrue(e.getEndpointUri().contains("period=10"));
+                assertTrue(e.getEndpointUri().contains("repeatCount=2"));
             });
 
-        camelSourceTask.stop();
-    }
-
-    private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask 
camelSourceTask, int retries) {
-        List<SourceRecord> poll;
-        do {
-            poll = camelSourceTask.poll();
-            if (poll == null) {
-                retries--;
-            }
-        } while (poll == null && retries > 0);
-        return poll;
+        sourceTask.stop();
     }
 }

Reply via email to