This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-kafka-connector-3.18.x in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c07d4380ca9c684e6c44cecbffaa99acaab8f377 Author: Jakub Malek <[email protected]> AuthorDate: Wed Oct 12 12:32:36 2022 +0200 fix #1447 Fixed NPE error during SinkTask header mapping --- .../apache/camel/kafkaconnector/CamelSinkTask.java | 3 +- .../camel/kafkaconnector/CamelSinkTaskTest.java | 188 ++++++++------------- 2 files changed, 77 insertions(+), 114 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index a53f298c5..b66bce262 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -240,7 +240,8 @@ public class CamelSinkTask extends SinkTask { final String key = StringHelper.after(header.key(), prefix, header.key()); final Schema schema = header.schema(); - if (schema.type().equals(Schema.BYTES_SCHEMA.type()) + if (schema != null + && schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME) && header.value() instanceof byte[]) { destination.put(key, Decimal.toLogical(schema, (byte[]) header.value())); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index c08a4f74c..598d7a3d2 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -19,6 +19,7 @@ package org.apache.camel.kafkaconnector; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -33,7 +34,10 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.SimpleHeaderConverter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -43,20 +47,32 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class CamelSinkTaskTest { +class CamelSinkTaskTest { private static final String SEDA_URI = "seda:test"; private static final String TOPIC_NAME = "my-topic"; private static final long RECEIVE_TIMEOUT = 1_000; private static final String TOPIC_CONF = "topics"; + private CamelSinkTask sinkTask; + + @BeforeEach + void setup() { + sinkTask = new CamelSinkTask(); + } + + @AfterEach + void tearDown() { + sinkTask.stop(); + } + + @Test - public void testOnlyBody() { + void testOnlyBody() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -70,17 +86,14 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testTopicsRegex() { + void testTopicsRegex() { Map<String, String> props = new HashMap<>(); props.put("topics.regex", "topic1*"); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -99,17 +112,14 @@ public class CamelSinkTaskTest { Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("cameltopicregex", exchange1.getMessage().getBody()); assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - - sinkTask.stop(); } @Test - public void testBodyAndHeaders() { + void testBodyAndHeaders() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -155,18 +165,15 @@ public class CamelSinkTaskTest { assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class)); - - sinkTask.stop(); } @Test - public void testBodyAndHeadersExclusions() { + void testBodyAndHeadersExclusions() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "MyBoolean" + "|" + "MyShort"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -203,18 +210,15 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); - - sinkTask.stop(); } @Test - public void testBodyAndHeadersExclusionsRegex() { + void testBodyAndHeadersExclusionsRegex() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -251,17 +255,14 @@ public class CamelSinkTaskTest { assertNull(exchange.getIn().getHeader("MyInteger")); assertNull(exchange.getIn().getHeader("MyLong", Long.class)); assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); - - sinkTask.stop(); } @Test - public void testBodyAndProperties() { + void testBodyAndProperties() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -294,17 +295,14 @@ public class CamelSinkTaskTest { assertEquals(myDouble, (Double) exchange.getProperties().get("MyDouble")); assertEquals(myInteger, exchange.getProperties().get("MyInteger")); assertEquals(myLong, (Long) exchange.getProperties().get("MyLong")); - - sinkTask.stop(); } @Test - public void testBodyAndPropertiesHeadersMixed() { + void testBodyAndPropertiesHeadersMixed() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -351,17 +349,14 @@ public class CamelSinkTaskTest { assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); - - sinkTask.stop(); } @Test - public void testBodyAndHeadersMap() { + void testBodyAndHeadersMap() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -406,16 +401,14 @@ public class CamelSinkTaskTest { assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class)); assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); - sinkTask.stop(); } @Test - public void testBodyAndPropertiesHeadersMapMixed() { + void testBodyAndPropertiesHeadersMapMixed() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -480,17 +473,14 @@ public class CamelSinkTaskTest { assertEquals(map, exchange.getIn().getHeader("MyMap", Map.class)); assertEquals(map1, exchange.getIn().getHeader("MyMap1", Map.class)); assertEquals(map2, exchange.getIn().getHeader("MyMap2", Map.class)); - - sinkTask.stop(); } @Test - public void testBodyAndHeadersList() { + void testBodyAndHeadersList() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -531,16 +521,14 @@ public class CamelSinkTaskTest { assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); - sinkTask.stop(); } @Test - public void testBodyAndPropertiesHeadersListMixed() { + void testBodyAndPropertiesHeadersListMixed() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -599,12 +587,10 @@ public class CamelSinkTaskTest { assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(list, exchange.getIn().getHeader("MyList", List.class)); assertEquals(list1, exchange.getIn().getHeader("MyList1", List.class)); - - sinkTask.stop(); } @Test - public void testUrlPrecedenceOnComponentProperty() { + void testUrlPrecedenceOnComponentProperty() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); @@ -612,7 +598,6 @@ public class CamelSinkTaskTest { props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "shouldNotBeUsed"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -624,19 +609,16 @@ public class CamelSinkTaskTest { Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); assertEquals("camel", exchange.getMessage().getBody()); assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); - - sinkTask.stop(); } @Test - public void testOnlyBodyUsingComponentProperty() { + void testOnlyBodyUsingComponentProperty() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda"); props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -650,12 +632,10 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints() .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true")).count()); - - sinkTask.stop(); } @Test - public void testOnlyBodyUsingMultipleComponentProperties() { + void testOnlyBodyUsingMultipleComponentProperties() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda"); @@ -663,7 +643,6 @@ public class CamelSinkTaskTest { props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -678,19 +657,16 @@ public class CamelSinkTaskTest { assertEquals(1, sinkTask.getCms().getCamelContext().getEndpoints() .stream().filter(e -> e.getEndpointUri().equals("seda://test?bridgeErrorHandler=true&size=50")).count()); - - sinkTask.stop(); } @Test - public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() { + void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF, "false"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -737,18 +713,15 @@ public class CamelSinkTaskTest { assertFalse(exchange.getMessage().getHeaders().containsKey("MyDouble")); assertFalse(exchange.getMessage().getHeaders().containsKey("MyInteger")); assertFalse(exchange.getMessage().getHeaders().containsKey("MyLong")); - - sinkTask.stop(); } @Test - public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() { + void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF, "false"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); Byte myByte = new Byte("100"); @@ -795,19 +768,16 @@ public class CamelSinkTaskTest { assertEquals(myDouble, exchange.getIn().getHeader("MyDouble", Double.class)); assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); - - sinkTask.stop(); } @Test - public void testIfExchangeFailsShouldThrowConnectException() { + void testIfExchangeFailsShouldThrowConnectException() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); // we use a dummy component sink in order fail the exchange delivery props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct"); props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -815,18 +785,15 @@ public class CamelSinkTaskTest { records.add(record); assertThrows(ConnectException.class, () -> sinkTask.put(records)); - - sinkTask.stop(); } @Test - public void testAggregationBody() { + void testAggregationBody() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -850,19 +817,16 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testAggregationBodyAndTimeout() throws InterruptedException { + void testAggregationBodyAndTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, "5"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -886,12 +850,10 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException { + void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); @@ -900,7 +862,6 @@ public class CamelSinkTaskTest { props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, "100"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -949,18 +910,15 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testWithIdempotency() throws InterruptedException { + void testWithIdempotency() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "body"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -1015,19 +973,16 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testWithIdempotencyAndHeader() throws InterruptedException { + void testWithIdempotencyAndHeader() throws InterruptedException { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_ENABLED_CONF, "true"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_TYPE_CONF, "header"); props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_IDEMPOTENCY_EXPRESSION_HEADER_CONF, "headerIdempotency"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); List<SinkRecord> records = new ArrayList<SinkRecord>(); @@ -1056,12 +1011,10 @@ public class CamelSinkTaskTest { assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); - - sinkTask.stop(); } @Test - public void testSecretRaw() { + void testSecretRaw() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put("camel.sink.endpoint.secretKey", "se+ret"); @@ -1069,14 +1022,11 @@ public class CamelSinkTaskTest { props.put("camel.sink.endpoint.queueNameOrArn", "test"); props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); - - sinkTask.stop(); } @Test - public void testSecretRawReference() { + void testSecretRawReference() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey"); @@ -1085,53 +1035,65 @@ public class CamelSinkTaskTest { props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs"); props.put("myAccessKey", "MoreSe+ret$"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); - - sinkTask.stop(); } @Test - public void testBodyAndDateHeader() { + void testBodyAndDateHeader() { final Date now = new Date(); Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); - try { - List<SinkRecord> records = new ArrayList<>(); + List<SinkRecord> records = new ArrayList<>(); - SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); - record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now); - records.add(record); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + record.headers().addTimestamp(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDate", now); + records.add(record); - sinkTask.put(records); + sinkTask.put(records); - Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT); + Exchange exchange = sinkTask.getCms().getConsumerTemplate().receive(SEDA_URI, RECEIVE_TIMEOUT); - assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> { - assertThat(value).isEqualTo(now); - }); - } finally { - sinkTask.stop(); - } + assertThat(exchange.getIn().getHeader("MyDate")).isInstanceOfSatisfying(Date.class, value -> { + assertThat(value).isEqualTo(now); + }); } @Test - public void testContentLogLevelConfiguration() { + void testContentLogLevelConfiguration() { Map<String, String> props = new HashMap<>(); props.put(TOPIC_CONF, TOPIC_NAME); props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO"); - CamelSinkTask sinkTask = new CamelSinkTask(); sinkTask.start(props); assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel()); + } - sinkTask.stop(); + @Test + void testThatSchemalessHeaderIsBeingMappedToExchange() { + // given sink task + Map<String, String> properties = new HashMap<>(); + properties.put(TOPIC_CONF, TOPIC_NAME); + properties.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + sinkTask.start(properties); + + // and source record + String headerName = "test-header"; + Long headerValue = 1234L; + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 0); + record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + headerName, headerValue, null); + + // when + sinkTask.put(Collections.singleton(record)); + + // then + ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertThat(exchange.getIn().getHeader(headerName)).isEqualTo(headerValue); } }
