This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new dc5b5fd Code cleanup
new 2095b4b Merge pull request #184 from fvaleri/cleanup
dc5b5fd is described below
commit dc5b5fdb189c8402535d41695320b508f870e9f6
Author: Federico Valeri <fvaleri@localhost>
AuthorDate: Sun May 3 17:59:51 2020 +0200
Code cleanup
---
.../camel/kafkaconnector/CamelSinkConnector.java | 6 +-
.../kafkaconnector/CamelSinkConnectorConfig.java | 13 +++--
.../apache/camel/kafkaconnector/CamelSinkTask.java | 18 +++---
.../kafkaconnector/CamelSourceConnectorConfig.java | 26 +++++----
.../camel/kafkaconnector/CamelSourceTask.java | 5 +-
.../kafkaconnector/utils/CamelMainSupport.java | 26 ++++-----
.../camel/kafkaconnector/CamelSourceTaskTest.java | 65 +++++++++++-----------
.../maven/AbstractCamelKafkaConnectorMojo.java | 2 +-
8 files changed, 80 insertions(+), 81 deletions(-)
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
index 7cc47f0..092e513 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelSinkConnector extends SinkConnector {
- private static Logger log =
LoggerFactory.getLogger(CamelSinkConnector.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkConnector.class);
private Map<String, String> configProps;
@@ -38,7 +38,7 @@ public class CamelSinkConnector extends SinkConnector {
@Override
public void start(Map<String, String> configProps) {
- log.info("Connector config keys: {}", String.join(", ",
configProps.keySet()));
+ LOG.info("Connector config keys: {}", String.join(", ",
configProps.keySet()));
this.configProps = configProps;
}
@@ -49,7 +49,7 @@ public class CamelSinkConnector extends SinkConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- log.info("Setting task configurations for {} workers.", maxTasks);
+ LOG.info("Setting task configurations for {} workers.", maxTasks);
final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
configs.add(configProps);
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index d59d342..e11e84d 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,18 +38,21 @@ public class CamelSinkConnectorConfig extends
AbstractConfig {
public static final String CAMEL_SINK_URL_DOC = "The camel url to
configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
+ " and all the properties starting with " +
CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" +
CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT,
Importance.HIGH, CAMEL_SINK_URL_DOC)
+ .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING,
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
+ .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING,
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String>
parsedConfig) {
super(config, parsedConfig);
}
public CamelSinkConnectorConfig(Map<String, String> parsedConfig) {
- this(conf(), parsedConfig);
+ this(CONFIG_DEF, parsedConfig);
}
public static ConfigDef conf() {
- return new ConfigDef()
- .define(CAMEL_SINK_URL_CONF, Type.STRING,
CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
- .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING,
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
- .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING,
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+ return CONFIG_DEF;
}
+
}
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9785215..baa6665 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,11 +19,9 @@ package org.apache.camel.kafkaconnector;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
@@ -44,7 +42,7 @@ public class CamelSinkTask extends SinkTask {
private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX =
"camel.sink.endpoint.";
private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX =
"camel.sink.path.";
- private static Logger log = LoggerFactory.getLogger(CamelSinkTask.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkTask.class);
private static final String LOCAL_URL = "direct:start";
private static final String HEADER_CAMEL_PREFIX = "CamelHeader";
@@ -62,7 +60,7 @@ public class CamelSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
try {
- log.info("Starting CamelSinkTask connector task");
+ LOG.info("Starting CamelSinkTask connector task");
Map<String, String> actualProps =
TaskHelper.mergeProperties(getDefaultConfig(), props);
config = getCamelSinkConnectorConfig(actualProps);
@@ -78,7 +76,7 @@ public class CamelSinkTask extends SinkTask {
producer = cms.createProducerTemplate();
cms.start();
- log.info("CamelSinkTask connector task started");
+ LOG.info("CamelSinkTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel
context", e);
}
@@ -116,20 +114,20 @@ public class CamelSinkTask extends SinkTask {
}
exchange.getMessage().setHeaders(headers);
exchange.getMessage().setBody(record.value());
- log.debug("Sending {} to {}", exchange, LOCAL_URL);
+ LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
producer.send(LOCAL_URL, exchange);
}
}
@Override
public void stop() {
+ LOG.info("Stopping CamelSinkTask connector task");
try {
- log.info("Stopping CamelSinkTask connector task");
cms.stop();
} catch (Exception e) {
throw new ConnectException("Failed to stop Camel context", e);
} finally {
- log.info("CamelSinkTask connector task stopped");
+ LOG.info("CamelSinkTask connector task stopped");
}
}
@@ -157,7 +155,7 @@ public class CamelSinkTask extends SinkTask {
map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
} else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
{
map.put(singleHeader.key(), (List<?>)singleHeader.value());
- }
+ }
}
private void addProperty(Exchange exchange, Header singleHeader) {
@@ -184,7 +182,7 @@ public class CamelSinkTask extends SinkTask {
exchange.getProperties().put(singleHeader.key(), (Map<?,
?>)singleHeader.value());
} else if
(schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName()))
{
exchange.getProperties().put(singleHeader.key(),
(List<?>)singleHeader.value());
- }
+ }
}
public CamelMainSupport getCms() {
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 930ae8b..3de49dd 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -67,25 +67,27 @@ public class CamelSourceConnectorConfig extends
AbstractConfig {
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name
of a camel message header containing an unique key that can be used as a Kafka
message key."
+ " If this is not specified, then the Kafka message will not have
a key.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT,
Importance.HIGH, CAMEL_SOURCE_URL_DOC)
+ .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING,
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
+ .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT,
ConfigDef.Importance.HIGH, TOPIC_DOC)
+ .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG,
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
+ .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG,
CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG,
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG,
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF,
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT,
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+ .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
+ .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING,
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String>
parsedConfig) {
super(config, parsedConfig);
}
public CamelSourceConnectorConfig(Map<String, String> parsedConfig) {
- this(conf(), parsedConfig);
+ this(CONFIG_DEF, parsedConfig);
}
public static ConfigDef conf() {
- return new ConfigDef()
- .define(CAMEL_SOURCE_URL_CONF, Type.STRING,
CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
- .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING,
CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
- .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT,
ConfigDef.Importance.HIGH, TOPIC_DOC)
- .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG,
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
- .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG,
CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF,
Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF,
Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT,
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF,
Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT,
Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
- .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM,
CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
- .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING,
CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+ return CONFIG_DEF;
}
}
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 5025296..83ea1b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -24,7 +24,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -91,7 +90,7 @@ public class CamelSourceTask extends SourceTask {
Endpoint endpoint = cms.getEndpoint(localUrl);
consumer = endpoint.createPollingConsumer();
consumer.start();
-
+
cms.start();
LOG.info("CamelSourceTask connector task started");
} catch (Exception e) {
@@ -143,7 +142,7 @@ public class CamelSourceTask extends SourceTask {
}
if (records.isEmpty()) {
- return null;
+ return Collections.EMPTY_LIST;
} else {
return records;
}
diff --git
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index ddfd1e2..91735f2 100644
---
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class CamelMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX =
"camel.dataformat.";
- private static Logger log =
LoggerFactory.getLogger(CamelMainSupport.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelMainSupport.class);
private Main camelMain;
private CamelContext camel;
@@ -83,7 +83,7 @@ public class CamelMainSupport {
Properties camelProperties = new OrderedProperties();
camelProperties.putAll(orderedProps);
- log.info("Setting initial properties in Camel context: [{}]",
camelProperties);
+ LOG.info("Setting initial properties in Camel context: [{}]",
camelProperties);
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
//creating the actual route
@@ -93,15 +93,15 @@ public class CamelMainSupport {
if (marshal != null && unmarshal != null) {
throw new UnsupportedOperationException("Uses of both
marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not
supported");
} else if (marshal != null) {
- log.info("Creating Camel route
from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
+ LOG.info("Creating Camel route
from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
camel.getRegistry().bind(marshal,
lookupAndInstantiateDataformat(marshal));
rd.marshal().custom(marshal);
} else if (unmarshal != null) {
- log.info("Creating Camel route
from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
+ LOG.info("Creating Camel route
from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
camel.getRegistry().bind(unmarshal,
lookupAndInstantiateDataformat(unmarshal));
rd.unmarshal().custom(unmarshal);
} else {
- log.info("Creating Camel route from({}).to({})", fromUrl,
toUrl);
+ LOG.info("Creating Camel route from({}).to({})", fromUrl,
toUrl);
}
rd.to(toUrl);
}
@@ -109,27 +109,27 @@ public class CamelMainSupport {
}
public void start() throws Exception {
- log.info("Starting CamelContext");
+ LOG.info("Starting CamelContext");
CamelContextStarter starter = new CamelContextStarter();
exService.execute(starter);
startFinishedSignal.await();
if (starter.hasException()) {
- log.info("CamelContext failed to start", starter.getException());
+ LOG.info("CamelContext failed to start", starter.getException());
throw starter.getException();
}
- log.info("CamelContext started");
+ LOG.info("CamelContext started");
}
public void stop() {
- log.info("Stopping CamelContext");
+ LOG.info("Stopping CamelContext");
camelMain.stop();
exService.shutdown();
- log.info("CamelContext stopped");
+ LOG.info("CamelContext stopped");
}
public ProducerTemplate createProducerTemplate() {
@@ -190,7 +190,7 @@ public class CamelMainSupport {
@Override
public void afterStart(BaseMainSupport main) {
- log.trace("Signaling CamelContext startup is finished
(startFinishedSignal.countDown();) due to CamelMainFinishedListener been
called");
+ LOG.trace("Signaling CamelContext startup is finished
(startFinishedSignal.countDown();) due to CamelMainFinishedListener been
called");
startFinishedSignal.countDown();
}
@@ -217,10 +217,10 @@ public class CamelMainSupport {
try {
camelMain.run();
} catch (Exception e) {
- log.error("An exception has occurred before CamelContext
startup has finished", e);
+ LOG.error("An exception has occurred before CamelContext
startup has finished", e);
startException = e;
if (startFinishedSignal.getCount() > 0) {
- log.trace("Signaling CamelContext startup is finished
(startFinishedSignal.countDown();) due to an exception");
+ LOG.trace("Signaling CamelContext startup is finished
(startFinishedSignal.countDown();) due to an exception");
startFinishedSignal.countDown();
}
}
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 934b78d..89205ae 100644
---
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,20 +16,15 @@
*/
package org.apache.camel.kafkaconnector;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
@@ -40,16 +35,18 @@ import static org.junit.jupiter.api.Assertions.fail;
public class CamelSourceTaskTest {
+ private static final String TIMER_URI =
"timer:kafkaconnector?period=10&fixedRate=true&delay=0";
+
@Test
public void testSourcePolling() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(2, poll.size());
assertEquals("mytopic", poll.get(0).topic());
@@ -62,9 +59,9 @@ public class CamelSourceTaskTest {
break;
}
}
- assertTrue(containsHeader);
camelSourceTask.stop();
+ assertTrue(containsHeader);
}
@Test
@@ -82,7 +79,7 @@ public class CamelSourceTaskTest {
// first we test if we have a key in the message with body
template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", 1234);
- Thread.sleep(100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -92,7 +89,7 @@ public class CamelSourceTaskTest {
// second we test if we have no key under the header
template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader",
1234);
- Thread.sleep(100L);
+ Thread.sleep(11L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -102,7 +99,7 @@ public class CamelSourceTaskTest {
// third we test if we have the header but with null value
template.sendBodyAndHeader("direct:start", "awesome!",
"CamelSpecialTestKey", null);
- Thread.sleep(100L);
+ Thread.sleep(10L);
camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -126,7 +123,7 @@ public class CamelSourceTaskTest {
// send first data
template.sendBody("direct:start", "testing kafka connect");
- Thread.sleep(100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -138,7 +135,7 @@ public class CamelSourceTaskTest {
// send second data
template.sendBody("direct:start", true);
- Thread.sleep(100L);
+ Thread.sleep(11L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -150,7 +147,7 @@ public class CamelSourceTaskTest {
// second third data
template.sendBody("direct:start", 1234L);
- Thread.sleep(100L);
+ Thread.sleep(10L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -162,7 +159,7 @@ public class CamelSourceTaskTest {
// third with null data
template.sendBody("direct:start", null);
- Thread.sleep(100L);
+ Thread.sleep(10L);
poll = camelSourceTask.poll();
assertNull(poll.get(0).key());
assertNull(poll.get(0).keySchema());
@@ -175,50 +172,48 @@ public class CamelSourceTaskTest {
@Test
public void testSourcePollingTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put("camel.source.maxPollDuration", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(3000L);
+ long sleepTime = 30L;
+ Thread.sleep(sleepTime);
List<SourceRecord> poll;
int retries = 3;
do {
poll = camelSourceTask.poll();
if (poll == null) {
retries--;
-
if (retries == 0) {
fail("Exhausted the maximum retries and no record was
returned");
}
-
- Thread.sleep(3000L);
+ Thread.sleep(sleepTime);
}
} while (poll == null && retries > 0);
assertTrue(poll.size() >= 1, "Received messages are: " + poll.size() +
", expected between 1 and 2.");
assertTrue(poll.size() <= 2, "Received messages are: " + poll.size() +
", expected between 1 and 2.");
-
camelSourceTask.stop();
}
@Test
public void testSourcePollingMaxRecordNumber() throws InterruptedException
{
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put("camel.source.maxBatchPollSize", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2000L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(1, poll.size());
-
camelSourceTask.stop();
+
+ assertEquals(1, poll.size());
}
@Test
@@ -243,9 +238,9 @@ public class CamelSourceTaskTest {
}
@Test
- public void testUrlPrecedenceOnComponentProperty() throws
JsonProcessingException, InterruptedException {
+ public void testUrlPrecedenceOnComponentProperty() throws
InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"shouldNotBeUsed");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() +
"endpointProperty", "shouldNotBeUsed");
@@ -254,7 +249,7 @@ public class CamelSourceTaskTest {
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(2, poll.size());
assertEquals("mytopic", poll.get(0).topic());
@@ -267,13 +262,13 @@ public class CamelSourceTaskTest {
break;
}
}
- assertTrue(containsHeader);
-
camelSourceTask.stop();
+
+ assertTrue(containsHeader);
}
@Test
- public void testSourcePollingUsingComponentProperty() throws
JsonProcessingException, InterruptedException {
+ public void testSourcePollingUsingComponentProperty() throws
InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
@@ -298,13 +293,14 @@ public class CamelSourceTaskTest {
}
assertTrue(containsHeader);
- assertEquals(1,
camelSourceTask.getCms().getEndpoints().stream().filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+ assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
camelSourceTask.stop();
}
@Test
- public void testSourcePollingUsingMultipleComponentProperties() throws
JsonProcessingException, InterruptedException {
+ public void testSourcePollingUsingMultipleComponentProperties() throws
InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF,
"timer");
@@ -330,7 +326,8 @@ public class CamelSourceTaskTest {
}
assertTrue(containsHeader);
- assertEquals(1,
camelSourceTask.getCms().getEndpoints().stream().filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+ assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e ->
e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
camelSourceTask.stop();
}
diff --git
a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
index aeb2da1..d1eef84 100644
---
a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
+++
b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
@@ -112,7 +112,7 @@ public abstract class AbstractCamelKafkaConnectorMojo
extends AbstractMojo {
public void execute() throws MojoExecutionException, MojoFailureException {
configureResourceManager();
if (!project.getArtifactId().equals(connectorsProjectName)) {
- getLog().debug("Skipping porject " + project.getArtifactId() + "
since it is not " + connectorsProjectName + " can be configured with
<connectors-project-name> option.");
+ getLog().debug("Skipping project " + project.getArtifactId() + "
since it is not " + connectorsProjectName + " can be configured with
<connectors-project-name> option.");
return;
}
try {