This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ff14cf9 [CAMEL-15027] Do not propagate record specific Kafka headers
(#3813)
ff14cf9 is described below
commit ff14cf9be7a5dd3d5433ede2b0f23fafb4379927
Author: Louis Burton <[email protected]>
AuthorDate: Sat May 9 06:49:46 2020 +0100
[CAMEL-15027] Do not propagate record specific Kafka headers (#3813)
* CAMEL-15027: Do not propagate record specific kafka headers
* CAMEL-15027: Checkstyle fix - import spacing
Co-authored-by: Louis Burton <[email protected]>
---
.../component/kafka/KafkaHeaderFilterStrategy.java | 13 +++++++--
.../component/kafka/KafkaConsumerFullTest.java | 17 ++++++++++++
.../camel/component/kafka/KafkaEndpointTest.java | 2 +-
.../component/kafka/KafkaProducerFullTest.java | 32 ++++++++++++++++++++++
4 files changed, 60 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
index c5a63a8..c5fbd1d 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
@@ -16,10 +16,17 @@
*/
package org.apache.camel.component.kafka;
+import java.util.regex.Pattern;
+
import org.apache.camel.support.DefaultHeaderFilterStrategy;
public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+ /**
+ * A filter pattern that only accepts keys starting with <tt>Camel</tt> or
<tt>org.apache.camel.</tt>
+ */
+ public static final Pattern CAMEL_KAFKA_FILTER_PATTERN =
Pattern.compile("(?i)(Camel|org\\.apache\\.camel|kafka\\.)[\\.|a-z|A-z|0-9]*");
+
public KafkaHeaderFilterStrategy() {
initialize();
}
@@ -28,8 +35,8 @@ public class KafkaHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
// filter out kafka record metadata
getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata");
- // filter headers begin with "Camel" or "org.apache.camel"
- setOutFilterPattern(CAMEL_FILTER_PATTERN);
- setInFilterPattern(CAMEL_FILTER_PATTERN);
+ // filter headers beginning with "Camel" or "org.apache.camel" or
"kafka."
+ setOutFilterPattern(CAMEL_KAFKA_FILTER_PATTERN);
+ setInFilterPattern(CAMEL_KAFKA_FILTER_PATTERN);
}
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 3673b3f..e26a324 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -108,6 +108,23 @@ public class KafkaConsumerFullTest extends
BaseEmbeddedKafkaTest {
}
@Test
+ public void kafkaRecordSpecificHeadersAreNotOverwritten() throws
InterruptedException, IOException {
+ String propagatedHeaderKey = KafkaConstants.TOPIC;
+ byte[] propagatedHeaderValue = "propagated incorrect topic".getBytes();
+ to.expectedHeaderReceived(KafkaConstants.TOPIC, TOPIC);
+
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1",
"message");
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
+ producer.send(data);
+
+ to.assertIsSatisfied(3000);
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertTrue("Should receive KafkaEndpoint populated kafka.TOPIC
header", headers.containsKey(KafkaConstants.TOPIC));
+ assertEquals("Topic name received", TOPIC,
headers.get(KafkaConstants.TOPIC));
+ }
+
+ @Test
@Ignore("Currently there is a bug in kafka which leads to an
uninterruptable thread so a resub take too long (works manually)")
public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws
Exception {
to.expectedMessageCount(5);
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index 4928eeb..e952fe8 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -69,7 +69,7 @@ public class KafkaEndpointTest {
}
@Test
- public void isSingletonShoudlReturnTrue() {
+ public void isSingletonShouldReturnTrue() {
assertTrue(endpoint.isSingleton());
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 19d8b8d..a953c48 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -58,6 +58,7 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
private static final String GROUP_BYTES = "groupStrings";
private static final String TOPIC_PROPAGATED_HEADERS =
"testPropagatedHeaders";
+ private static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS =
"noRecordSpecificHeaders";
private static KafkaConsumer<String, String> stringsConsumerConn;
private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@@ -81,6 +82,9 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
@EndpointInject("kafka:" + TOPIC_PROPAGATED_HEADERS +
"?requestRequiredAcks=-1")
private Endpoint toPropagatedHeaders;
+ @EndpointInject("kafka:" + TOPIC_NO_RECORD_SPECIFIC_HEADERS +
"?requestRequiredAcks=-1")
+ private Endpoint toNoRecordSpecificHeaders;
+
@Produce("direct:startStrings")
private ProducerTemplate stringsTemplate;
@@ -96,6 +100,9 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
@Produce("direct:propagatedHeaders")
private ProducerTemplate propagatedHeadersTemplate;
+ @Produce("direct:noRecordSpecificHeaders")
+ private ProducerTemplate noRecordSpecificHeadersTemplate;
+
@BindToRegistry("myStrategy")
private MyHeaderFilterStrategy strategy = new MyHeaderFilterStrategy();
@@ -134,6 +141,8 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
+
+
from("direct:noRecordSpecificHeaders").to(toNoRecordSpecificHeaders).to(mockEndpoint);
}
};
}
@@ -344,6 +353,29 @@ public class KafkaProducerFullTest extends
BaseEmbeddedKafkaTest {
}
@Test
+ public void recordSpecificHeaderIsNotReceivedByKafka() throws Exception {
+ String propagatedStringHeaderKey = KafkaConstants.TOPIC;
+ String propagatedStringHeaderValue = "source topic";
+
+ Map<String, Object> camelHeaders = new HashMap<>();
+ camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
+
+ CountDownLatch messagesLatch = new CountDownLatch(1);
+ noRecordSpecificHeadersTemplate.sendBodyAndHeaders("Some test
message", camelHeaders);
+
+ List<ConsumerRecord<String, String>> records =
pollForRecords(createStringKafkaConsumer("noRecordSpecificHeadersConsumer"),
TOPIC_NO_RECORD_SPECIFIC_HEADERS, messagesLatch);
+ boolean allMessagesReceived = messagesLatch.await(10_000,
TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+ ConsumerRecord<String, String> record = records.get(0);
+ Headers headers = record.headers();
+ assertNotNull("Kafka Headers should not be null.", headers);
+ // we have 0 headers
+ assertEquals("0 propagated headers are expected", 0,
headers.toArray().length);
+ }
+
+ @Test
public void headerFilterStrategyCouldBeOverridden() {
KafkaEndpoint kafkaEndpoint =
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
assertIsInstanceOf(MyHeaderFilterStrategy.class,
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());