This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit d2cbe95476f1efc33a793b7a36d4f54d5a8fe187 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Dec 4 18:20:25 2020 +0100 Fixed CS --- .../camel/kafkaconnector/utils/CamelKafkaConnectMain.java | 13 +++++++------ .../apache/camel/kafkaconnector/CamelSourceTaskTest.java | 4 +--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java index 7d3e7d7..5eb0df5 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -270,7 +270,8 @@ public class CamelKafkaConnectMain extends SimpleMain { if (ObjectHelper.isEmpty(headersExcludePattern)) { rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); } else { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) + .idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); } break; case "header": @@ -282,7 +283,7 @@ public class CamelKafkaConnectMain extends SimpleMain { .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); } else { rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout) - .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + .idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); } break; default: @@ -294,7 +295,7 @@ public class CamelKafkaConnectMain extends SimpleMain { if (ObjectHelper.isEmpty(headersExcludePattern)) { rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(to); } else { - rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).removeHeaders(headersExcludePattern).toD(to); } } } else { @@ -305,7 +306,7 @@ public class CamelKafkaConnectMain extends SimpleMain { if (ObjectHelper.isEmpty(headersExcludePattern)) { rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").toD(to); } else { - rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + rd.idempotentConsumer(body()).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); } break; case "header": @@ -313,7 +314,7 @@ public class CamelKafkaConnectMain extends SimpleMain { if (ObjectHelper.isEmpty(headersExcludePattern)) { rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").toD(to); } else { - rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); + rd.idempotentConsumer(header(expressionHeader)).messageIdRepositoryRef("idempotentRepository").removeHeaders(headersExcludePattern).toD(to); } break; default: @@ -325,7 +326,7 @@ public class CamelKafkaConnectMain extends SimpleMain { if (ObjectHelper.isEmpty(headersExcludePattern)) { rd.toD(to); } else { - rd.removeHeaders(headersExcludePattern).toD(to); + rd.removeHeaders(headersExcludePattern).toD(to); } } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index de611a0..fadbbb4 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -20,7 +20,6 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -32,7 +31,6 @@ import org.apache.camel.kafkaconnector.utils.StringJoinerAggregator; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; -import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Test; @@ -538,7 +536,7 @@ public class CamelSourceTaskTest { .start(mapOf(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME, CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, true, CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header", CamelSourceConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency", - CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "headerIdempotency")); + CamelSourceConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "headerIdempotency")); try {