This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 49aa5e8 Improved CamelSourceTaskTest new 5593ea7 Merge pull request #205 from fvaleri/no-flaky 49aa5e8 is described below commit 49aa5e8cba11a49c9fa57f4dc1a3639e9a2c0e29 Author: Federico Valeri <fvaleri@localhost> AuthorDate: Sat May 9 21:06:28 2020 +0200 Improved CamelSourceTaskTest --- .../camel/kafkaconnector/CamelSourceTaskTest.java | 365 ++++++++++----------- 1 file changed, 165 insertions(+), 200 deletions(-) diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 9c40d40..33807ee 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -19,7 +19,6 @@ package org.apache.camel.kafkaconnector; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import org.apache.camel.ProducerTemplate; import org.apache.kafka.connect.data.Schema; @@ -32,270 +31,236 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class CamelSourceTaskTest { + private static final String DIRECT_URI = "direct:start"; + private static final String TOPIC_NAME = "my-topic"; + + private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) { + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + for (int i = 0; i < size; i++) { + template.sendBody(DIRECT_URI, "test" + i); + } + } + @Test public void testSourcePolling() { + final long size = 2; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - template.sendBody("direct:start", "awesome!"); + sendBatchOfRecords(sourceTask, size); + List<SourceRecord> poll = sourceTask.poll(); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals("mytopic", poll.get(0).topic()); + assertEquals(size, poll.size()); + assertEquals(TOPIC_NAME, poll.get(0).topic()); - camelSourceTask.stop(); + sourceTask.stop(); } @Test - public void testSourcePollingWithKey() { + public void testSourcePollingMaxBatchPollSize() { + final long size = 2; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); - - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - - // first we test if we have a key in the message with body - template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234); - - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals(1234, poll.get(0).key()); - assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type()); - - // second we test if we have no key under the header - template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // third we test if we have the header but with null value - template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, String.valueOf(size)); - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - camelSourceTask.stop(); - } + sendBatchOfRecords(sourceTask, size + 1); + List<SourceRecord> poll = sourceTask.poll(); + int pollSize = poll.size(); - @Test - public void testSourcePollingWithBody() { - Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); - - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - - // send first data - template.sendBody("direct:start", "testing kafka connect"); - - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals("testing kafka connect", poll.get(0).value()); - assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // send second data - template.sendBody("direct:start", true); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertTrue((boolean)poll.get(0).value()); - assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // second third data - template.sendBody("direct:start", 1234L); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals(1234L, poll.get(0).value()); - assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // third with null data - template.sendBody("direct:start", null); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - assertNull(poll.get(0).value()); - assertNull(poll.get(0).valueSchema()); - - camelSourceTask.stop(); + assertTrue(pollSize >= 0 && pollSize <= size, "Batch size: " + pollSize + ", expected between 0 and " + size); + sourceTask.stop(); } @Test public void testSourcePollingTimeout() { - final int nuberOfMessagesSent = 999; + final long size = 999; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put("camel.source.maxPollDuration", "1"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); - - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "2"); - // first we send nuberOfMessagesSent of messages - Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - // then we assert we received only a fraction of them (proving that polling timeout of 1 Millisecond is working) - List<SourceRecord> poll = camelSourceTask.poll(); - assertTrue(poll.size() < nuberOfMessagesSent, "Expected received messages count to be strictly less than " + nuberOfMessagesSent + ", got " + poll.size()); + sendBatchOfRecords(sourceTask, size); + List<SourceRecord> poll = sourceTask.poll(); + int pollSize = poll.size(); - camelSourceTask.stop(); + assertTrue(pollSize < size, "Batch size: " + pollSize + ", expected strictly less than " + size); + sourceTask.stop(); } @Test - public void testSourcePollingMaxRecordNumber() { - final int nuberOfMessagesSent = 2; + public void testSourcePollingWithKey() { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put("camel.source.maxBatchPollSize", "1"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey"); - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); - // first we send nuberOfMessagesSent of messages > camel.source.maxBatchPollSize - Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); + // key in the message with body + template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 1234); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); + List<SourceRecord> poll1 = sourceTask.poll(); + assertEquals(1, poll1.size()); + assertEquals(1234, poll1.get(0).key()); + assertEquals(Schema.Type.INT32, poll1.get(0).keySchema().type()); - // then we assert we received just camel.source.maxBatchPollSize - assertEquals(1, poll.size()); - camelSourceTask.stop(); - } + // no key under the header + template.sendBodyAndHeader(DIRECT_URI, "test", "WrongHeader", 1234); - @Test - public void testSourceConsumerOptions() { - Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); - props.put("topics", "mytopic"); - props.put("camel.source.pollingConsumerQueueSize", "10"); - props.put("camel.source.pollingConsumerBlockTimeout", "1000"); - props.put("camel.source.pollingConsumerBlockWhenFull", "false"); + List<SourceRecord> poll2 = sourceTask.poll(); + assertEquals(1, poll2.size()); + assertNull(poll2.get(0).key()); + assertNull(poll2.get(0).keySchema()); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + // header with null value + template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", null); - assertEquals(2, camelSourceTask.getCms().getEndpoints().size()); + List<SourceRecord> poll3 = sourceTask.poll(); + assertEquals(1, poll3.size()); + assertNull(poll3.get(0).key()); + assertNull(poll3.get(0).keySchema()); - camelSourceTask.getCms().getEndpoints().stream() - .filter(e -> e.getEndpointUri().startsWith("direct")) - .forEach(e -> { - assertTrue(e.getEndpointUri().contains("end")); - assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000")); - assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false")); - assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10")); - }); + sourceTask.stop(); + } - camelSourceTask.stop(); + @Test + public void testSourcePollingWithBody() { + Map<String, String> props = new HashMap<>(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + + // send String + template.sendBody(DIRECT_URI, "test"); + + List<SourceRecord> poll1 = sourceTask.poll(); + assertEquals(1, poll1.size()); + assertEquals("test", poll1.get(0).value()); + assertEquals(Schema.Type.STRING, poll1.get(0).valueSchema().type()); + assertNull(poll1.get(0).key()); + assertNull(poll1.get(0).keySchema()); + + // send boolean + template.sendBody(DIRECT_URI, true); + + List<SourceRecord> poll2 = sourceTask.poll(); + assertEquals(1, poll2.size()); + assertTrue((boolean)poll2.get(0).value()); + assertEquals(Schema.Type.BOOLEAN, poll2.get(0).valueSchema().type()); + assertNull(poll2.get(0).key()); + assertNull(poll2.get(0).keySchema()); + + // send long + template.sendBody(DIRECT_URI, 1234L); + + List<SourceRecord> poll3 = sourceTask.poll(); + assertEquals(1, poll3.size()); + assertEquals(1234L, poll3.get(0).value()); + assertEquals(Schema.Type.INT64, poll3.get(0).valueSchema().type()); + assertNull(poll3.get(0).key()); + assertNull(poll3.get(0).keySchema()); + + // send null + template.sendBody(DIRECT_URI, null); + + List<SourceRecord> poll4 = sourceTask.poll(); + assertNull(poll4.get(0).key()); + assertNull(poll4.get(0).keySchema()); + assertNull(poll4.get(0).value()); + assertNull(poll4.get(0).valueSchema()); + + sourceTask.stop(); } @Test - public void testSourceUrlPrecedenceOnComponentProperty() { + public void testUrlPrecedenceOnComponentProperty() { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector?period=10&fixedRate=true&delay=0"); - props.put("topics", "mytopic"); - //these properties should be ignored + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2"); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "delay", "100000000"); - props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "name", "shouldNotBeUsed"); + props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "shouldNotBeUsed"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "999"); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - assertEquals(2, camelSourceTask.getCms().getEndpoints().size()); + assertEquals(2, sourceTask.getCms().getEndpoints().size()); - camelSourceTask.getCms().getEndpoints().stream() + sourceTask.getCms().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) .forEach(e -> { - assertTrue(e.getEndpointUri().contains("kafkaconnector")); + assertTrue(e.getEndpointUri().contains("foo")); assertTrue(e.getEndpointUri().contains("period=10")); - assertTrue(e.getEndpointUri().contains("fixedRate=true")); - assertTrue(e.getEndpointUri().contains("delay=0")); + assertTrue(e.getEndpointUri().contains("repeatCount=2")); }); - camelSourceTask.stop(); + sourceTask.stop(); } @Test - public void testSourceUsingComponentProperty() { + public void testSourcePollingConsumerOptions() { Map<String, String> props = new HashMap<>(); - props.put("topics", "mytopic"); - props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "10000"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "delay", "0"); - props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector"); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, "timer:foo?period=10&repeatCount=2"); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, "10"); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, "10"); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, "false"); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - camelSourceTask.getCms().getEndpoints().stream() - .filter(e -> e.getEndpointUri().startsWith("timer")) - .forEach(e -> { - assertTrue(e.getEndpointUri().contains("kafkaconnector")); - assertTrue(e.getEndpointUri().contains("period=1000")); - assertTrue(e.getEndpointUri().contains("delay=0")); - }); + assertEquals(2, sourceTask.getCms().getEndpoints().size()); - camelSourceTask.stop(); + sourceTask.getCms().getEndpoints().stream() + .filter(e -> e.getEndpointUri().startsWith("direct")) + .forEach(e -> { + assertTrue(e.getEndpointUri().contains("end")); + assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10")); + assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=10")); + assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false")); + }); + + sourceTask.stop(); } @Test - public void testSourceUsingMultipleComponentProperties() { + public void testSourceUsingComponentProperties() { Map<String, String> props = new HashMap<>(); - props.put("topics", "mytopic"); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000"); - props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "0"); - props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector"); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, "2"); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "10"); + props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "foo"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "10"); + props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "2"); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - camelSourceTask.getCms().getEndpoints().stream() + sourceTask.getCms().getEndpoints().stream() .filter(e -> e.getEndpointUri().startsWith("timer")) .forEach(e -> { - assertTrue(e.getEndpointUri().contains("kafkaconnector")); - assertTrue(e.getEndpointUri().contains("period=1000")); - assertTrue(e.getEndpointUri().contains("repeatCount=0")); + assertTrue(e.getEndpointUri().contains("foo")); + assertTrue(e.getEndpointUri().contains("period=10")); + assertTrue(e.getEndpointUri().contains("repeatCount=2")); }); - camelSourceTask.stop(); - } - - private List<SourceRecord> camelSourceTaskPollWithRetries(CamelSourceTask camelSourceTask, int retries) { - List<SourceRecord> poll; - do { - poll = camelSourceTask.poll(); - if (poll == null) { - retries--; - } - } while (poll == null && retries > 0); - return poll; + sourceTask.stop(); } }