This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 314eee9 Added another test about removal of headers based on reg exp
314eee9 is described below
commit 314eee947e6f9c979a9c1ddbcd5516c3f51b6539
Author: Andrea Cosentino <[email protected]>
AuthorDate: Wed Dec 9 07:31:47 2020 +0100
Added another test about removal of headers based on reg exp
---
.../camel/kafkaconnector/CamelSinkTaskTest.java | 48 ++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 74ebd32..047d858 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -195,6 +195,54 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
+
+ @Test
+ public void testBodyAndHeadersExclusionsRegex() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF,
"My*");
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ BigDecimal myBigDecimal = new BigDecimal(1234567890);
+ Schema schema = Decimal.schema(myBigDecimal.scale());
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null,
"camel", 42);
+ record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyBoolean", true);
+ record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte",
myByte);
+ record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyFloat", myFloat);
+ record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyShort", myShort);
+ record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyDouble", myDouble);
+ record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyInteger", myInteger);
+ record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong",
myLong);
+ record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX +
"MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+ records.add(record);
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test",
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+ assertNull(exchange.getIn().getHeader("MyByte", Byte.class));
+ assertNull(exchange.getIn().getHeader("MyFloat", Float.class));
+ assertNull(exchange.getIn().getHeader("MyShort", Short.class));
+ assertNull(exchange.getIn().getHeader("MyDouble", Double.class));
+ assertNull(exchange.getIn().getHeader("MyInteger"));
+ assertNull(exchange.getIn().getHeader("MyLong", Long.class));
+ assertNull(exchange.getIn().getHeader("MyBigDecimal",
BigDecimal.class));
+
+ sinkTask.stop();
+ }
@Test
public void testBodyAndProperties() {