This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ff14cf9  [CAMEL-15027] Do not propagate record specific Kafka headers 
(#3813)
ff14cf9 is described below

commit ff14cf9be7a5dd3d5433ede2b0f23fafb4379927
Author: Louis Burton <[email protected]>
AuthorDate: Sat May 9 06:49:46 2020 +0100

    [CAMEL-15027] Do not propagate record specific Kafka headers (#3813)
    
    * CAMEL-15027: Do not propagate record specific kafka headers
    
    * CAMEL-15027: Checkstyle fix - import spacing
    
    Co-authored-by: Louis Burton <[email protected]>
---
 .../component/kafka/KafkaHeaderFilterStrategy.java | 13 +++++++--
 .../component/kafka/KafkaConsumerFullTest.java     | 17 ++++++++++++
 .../camel/component/kafka/KafkaEndpointTest.java   |  2 +-
 .../component/kafka/KafkaProducerFullTest.java     | 32 ++++++++++++++++++++++
 4 files changed, 60 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
index c5a63a8..c5fbd1d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
@@ -16,10 +16,17 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.regex.Pattern;
+
 import org.apache.camel.support.DefaultHeaderFilterStrategy;
 
 public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
 
+    /**
+     * A filter pattern that only accepts keys starting with <tt>Camel</tt> or 
<tt>org.apache.camel.</tt>
+     */
+    public static final Pattern CAMEL_KAFKA_FILTER_PATTERN = 
Pattern.compile("(?i)(Camel|org\\.apache\\.camel|kafka\\.)[\\.|a-z|A-z|0-9]*");
+
     public KafkaHeaderFilterStrategy() {
         initialize();
     }
@@ -28,8 +35,8 @@ public class KafkaHeaderFilterStrategy extends 
DefaultHeaderFilterStrategy {
         // filter out kafka record metadata
         getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata");
 
-        // filter headers begin with "Camel" or "org.apache.camel"
-        setOutFilterPattern(CAMEL_FILTER_PATTERN);
-        setInFilterPattern(CAMEL_FILTER_PATTERN);
+        // filter headers beginning with "Camel" or "org.apache.camel" or 
"kafka."
+        setOutFilterPattern(CAMEL_KAFKA_FILTER_PATTERN);
+        setInFilterPattern(CAMEL_KAFKA_FILTER_PATTERN);
     }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index 3673b3f..e26a324 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -108,6 +108,23 @@ public class KafkaConsumerFullTest extends 
BaseEmbeddedKafkaTest {
     }
 
     @Test
+    public void kafkaRecordSpecificHeadersAreNotOverwritten() throws 
InterruptedException, IOException {
+        String propagatedHeaderKey = KafkaConstants.TOPIC;
+        byte[] propagatedHeaderValue = "propagated incorrect topic".getBytes();
+        to.expectedHeaderReceived(KafkaConstants.TOPIC, TOPIC);
+
+        ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", 
"message");
+        data.headers().add(new RecordHeader(propagatedHeaderKey, 
propagatedHeaderValue));
+        producer.send(data);
+
+        to.assertIsSatisfied(3000);
+
+        Map<String, Object> headers = 
to.getExchanges().get(0).getIn().getHeaders();
+        assertTrue("Should receive KafkaEndpoint populated kafka.TOPIC 
header", headers.containsKey(KafkaConstants.TOPIC));
+        assertEquals("Topic name received", TOPIC, 
headers.get(KafkaConstants.TOPIC));
+    }
+
+    @Test
     @Ignore("Currently there is a bug in kafka which leads to an 
uninterruptable thread so a resub take too long (works manually)")
     public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws 
Exception {
         to.expectedMessageCount(5);
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
index 4928eeb..e952fe8 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
@@ -69,7 +69,7 @@ public class KafkaEndpointTest {
     }
 
     @Test
-    public void isSingletonShoudlReturnTrue() {
+    public void isSingletonShouldReturnTrue() {
         assertTrue(endpoint.isSingleton());
     }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 19d8b8d..a953c48 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -58,6 +58,7 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
     private static final String GROUP_BYTES = "groupStrings";
     private static final String TOPIC_PROPAGATED_HEADERS = 
"testPropagatedHeaders";
+    private static final String TOPIC_NO_RECORD_SPECIFIC_HEADERS = 
"noRecordSpecificHeaders";
 
     private static KafkaConsumer<String, String> stringsConsumerConn;
     private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@@ -81,6 +82,9 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @EndpointInject("kafka:" + TOPIC_PROPAGATED_HEADERS + 
"?requestRequiredAcks=-1")
     private Endpoint toPropagatedHeaders;
 
+    @EndpointInject("kafka:" + TOPIC_NO_RECORD_SPECIFIC_HEADERS + 
"?requestRequiredAcks=-1")
+    private Endpoint toNoRecordSpecificHeaders;
+
     @Produce("direct:startStrings")
     private ProducerTemplate stringsTemplate;
 
@@ -96,6 +100,9 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     @Produce("direct:propagatedHeaders")
     private ProducerTemplate propagatedHeadersTemplate;
 
+    @Produce("direct:noRecordSpecificHeaders")
+    private ProducerTemplate noRecordSpecificHeadersTemplate;
+
     @BindToRegistry("myStrategy")
     private MyHeaderFilterStrategy strategy = new MyHeaderFilterStrategy();
 
@@ -134,6 +141,8 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
                 
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
 
                 
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
+
+                
from("direct:noRecordSpecificHeaders").to(toNoRecordSpecificHeaders).to(mockEndpoint);
             }
         };
     }
@@ -344,6 +353,29 @@ public class KafkaProducerFullTest extends 
BaseEmbeddedKafkaTest {
     }
 
     @Test
+    public void recordSpecificHeaderIsNotReceivedByKafka() throws Exception {
+        String propagatedStringHeaderKey = KafkaConstants.TOPIC;
+        String propagatedStringHeaderValue = "source topic";
+
+        Map<String, Object> camelHeaders = new HashMap<>();
+        camelHeaders.put(propagatedStringHeaderKey, 
propagatedStringHeaderValue);
+
+        CountDownLatch messagesLatch = new CountDownLatch(1);
+        noRecordSpecificHeadersTemplate.sendBodyAndHeaders("Some test 
message", camelHeaders);
+
+        List<ConsumerRecord<String, String>> records = 
pollForRecords(createStringKafkaConsumer("noRecordSpecificHeadersConsumer"), 
TOPIC_NO_RECORD_SPECIFIC_HEADERS, messagesLatch);
+        boolean allMessagesReceived = messagesLatch.await(10_000, 
TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not 
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+        ConsumerRecord<String, String> record = records.get(0);
+        Headers headers = record.headers();
+        assertNotNull("Kafka Headers should not be null.", headers);
+        // we have 0 headers
+        assertEquals("0 propagated headers are expected", 0, 
headers.toArray().length);
+    }
+
+    @Test
     public void headerFilterStrategyCouldBeOverridden() {
         KafkaEndpoint kafkaEndpoint = 
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
 KafkaEndpoint.class);
         assertIsInstanceOf(MyHeaderFilterStrategy.class, 
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());

Reply via email to