This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1de9ad6 Sink test refactoring
new 9bcd681 Merge pull request #216 from fvaleri/sink-test-ref
1de9ad6 is described below
commit 1de9ad69644b5c46ebee471bf02e23f2930df8e7
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sat May 16 19:29:22 2020 +0200
Sink test refactoring
---
.../kafkaconnector/CamelSinkConnectorConfig.java | 4 +
.../kafkaconnector/CamelSourceConnectorConfig.java | 2 +-
.../camel/kafkaconnector/CamelSinkTaskTest.java | 232 ++++++++++-----------
3 files changed, 116 insertions(+), 122 deletions(-)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index e11e84d..c353c58 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,6 +38,10 @@ public class CamelSinkConnectorConfig extends AbstractConfig
{
public static final String CAMEL_SINK_URL_DOC = "The camel url to
configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
+ " and all the properties starting with " +
CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" +
CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
+ public static final String TOPIC_DEFAULT = "test";
+ public static final String TOPIC_CONF = "topics";
+ public static final String TOPIC_DOC = "A list of topics to use as input
for this connector";
+
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT,
Importance.HIGH, CAMEL_SINK_URL_DOC)
.define(CAMEL_SINK_MARSHAL_CONF, Type.STRING,
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 01a55b8..34e2495 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -40,7 +40,7 @@ public class CamelSourceConnectorConfig extends
AbstractConfig {
public static final String TOPIC_DEFAULT = "test";
public static final String TOPIC_CONF = "topics";
- public static final String TOPIC_DOC = "The topic to publish data to";
+ public static final String TOPIC_DOC = "A list of topics to use as output
for this connector";
public static final Long CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT = 1000L;
public static final String CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF =
"camel.source.maxBatchPollSize";
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index ba147b8..24cd03c 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -34,40 +34,41 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CamelSinkTaskTest {
+ private static final String SEDA_URI = "seda:test";
+ private static final String TOPIC_NAME = "my-topic";
+ private static final long RECEIVE_TIMEOUT = 1_000;
+
@Test
public void testOnlyBody() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
-
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- String topic = "mytopic";
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
- camelSinkTask.stop();
+ sinkTask.stop();
}
@Test
public void testBodyAndHeaders() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -76,7 +77,7 @@ public class CamelSinkTaskTest {
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -85,10 +86,10 @@ public class CamelSinkTaskTest {
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -99,19 +100,18 @@ public class CamelSinkTaskTest {
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndProperties() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -120,7 +120,7 @@ public class CamelSinkTaskTest {
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -129,10 +129,10 @@ public class CamelSinkTaskTest {
record.headers().addInt("CamelPropertyMyInteger", myInteger);
record.headers().addLong("CamelPropertyMyLong", myLong);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean)
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -143,19 +143,18 @@ public class CamelSinkTaskTest {
assertEquals(myInteger,
exchange.getProperties().get("CamelPropertyMyInteger"));
assertEquals(myLong, (Long)
exchange.getProperties().get("CamelPropertyMyLong"));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndPropertiesHeadersMixed() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -164,7 +163,7 @@ public class CamelSinkTaskTest {
Long myLong = new Long("100");
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -180,10 +179,10 @@ public class CamelSinkTaskTest {
record.headers().addInt("CamelHeaderMyInteger", myInteger);
record.headers().addLong("CamelHeaderMyLong", myLong);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean)
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -201,19 +200,18 @@ public class CamelSinkTaskTest {
assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndHeadersMap() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -228,7 +226,7 @@ public class CamelSinkTaskTest {
map2.put(1, 1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -240,10 +238,10 @@ public class CamelSinkTaskTest {
record.headers().addMap("CamelHeaderMyMap1", map1,
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap2", map2,
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -256,19 +254,18 @@ public class CamelSinkTaskTest {
assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class));
assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndPropertiesHeadersMapMixed() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -283,7 +280,7 @@ public class CamelSinkTaskTest {
map2.put(1, 1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -305,10 +302,10 @@ public class CamelSinkTaskTest {
record.headers().addMap("CamelHeaderMyMap1", map1,
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA));
record.headers().addMap("CamelHeaderMyMap2", map2,
SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA));
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean)
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -332,19 +329,18 @@ public class CamelSinkTaskTest {
assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class));
assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndHeadersList() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -357,7 +353,7 @@ public class CamelSinkTaskTest {
list1.add(1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelHeaderMyBoolean", true);
record.headers().addByte("CamelHeaderMyByte", myByte);
record.headers().addFloat("CamelHeaderMyFloat", myFloat);
@@ -368,10 +364,10 @@ public class CamelSinkTaskTest {
record.headers().addList("CamelHeaderMyList", list,
SchemaBuilder.array(Schema.STRING_SCHEMA));
record.headers().addList("CamelHeaderMyList1", list1,
SchemaBuilder.array(Schema.INT64_SCHEMA));
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue(exchange.getIn().getHeader("MyBoolean", Boolean.class));
@@ -383,19 +379,18 @@ public class CamelSinkTaskTest {
assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
-
+
@Test
public void testBodyAndPropertiesHeadersListMixed() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
- String topic = "mytopic";
Byte myByte = new Byte("100");
Float myFloat = new Float("100");
Short myShort = new Short("100");
@@ -408,7 +403,7 @@ public class CamelSinkTaskTest {
list1.add(1);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
record.headers().addBoolean("CamelPropertyMyBoolean", true);
record.headers().addByte("CamelPropertyMyByte", myByte);
record.headers().addFloat("CamelPropertyMyFloat", myFloat);
@@ -428,10 +423,10 @@ public class CamelSinkTaskTest {
record.headers().addList("CamelPropertyMyList", list,
SchemaBuilder.array(Schema.STRING_SCHEMA));
record.headers().addList("CamelPropertyMyList1", list1,
SchemaBuilder.array(Schema.INT64_SCHEMA));
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
assertTrue((boolean)
exchange.getProperties().get("CamelPropertyMyBoolean"));
@@ -453,91 +448,86 @@ public class CamelSinkTaskTest {
assertEquals(list, exchange.getIn().getHeader("MyList", List.class));
assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class));
- camelSinkTask.stop();
+ sinkTask.stop();
}
@Test
public void testUrlPrecedenceOnComponentProperty() {
Map<String, String> props = new HashMap<>();
- props.put("camel.sink.url", "seda:test");
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF,
"shouldNotBeUsed");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() +
"endpointProperty", "shouldNotBeUsed");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk",
"shouldNotBeUsed");
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
-
- String topic = "mytopic";
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
- camelSinkTask.stop();
+ sinkTask.stop();
}
@Test
public void testOnlyBodyUsingComponentProperty() {
Map<String, String> props = new HashMap<>();
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() +
"bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name",
"test");
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
-
- String topic = "mytopic";
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(1, sinkTask.getCms().getEndpoints()
+ .stream().filter(e ->
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
- assertEquals(1,
camelSinkTask.getCms().getEndpoints().stream().filter(e ->
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count());
-
- camelSinkTask.stop();
+ sinkTask.stop();
}
@Test
public void testOnlyBodyUsingMultipleComponentProperties() {
Map<String, String> props = new HashMap<>();
- props.put("topics", "mytopic");
+ props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() +
"bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size",
"50");
props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name",
"test");
- CamelSinkTask camelSinkTask = new CamelSinkTask();
- camelSinkTask.start(props);
-
- String topic = "mytopic";
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
List<SinkRecord> records = new ArrayList<SinkRecord>();
- SinkRecord record = new SinkRecord(topic, 1, null, "test", null,
"camel", 42);
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
records.add(record);
- camelSinkTask.put(records);
+ sinkTask.put(records);
- ConsumerTemplate c = camelSinkTask.getCms().createConsumerTemplate();
- Exchange exchange = c.receive("seda:test", 1000L);
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
assertEquals("camel", exchange.getMessage().getBody());
assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
- assertEquals(1,
camelSinkTask.getCms().getEndpoints().stream().filter(e ->
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
+ assertEquals(1, sinkTask.getCms().getEndpoints()
+ .stream().filter(e ->
e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count());
- camelSinkTask.stop();
+ sinkTask.stop();
}
}