This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new c2607e3583e CAMEL-21211: Azure Service Bus - filter unknown header
types (#15555)
c2607e3583e is described below
commit c2607e3583ee873d2a1de3a9b947692e9cecccf3
Author: Dylan Piergies <[email protected]>
AuthorDate: Mon Sep 16 05:45:34 2024 +0100
CAMEL-21211: Azure Service Bus - filter unknown header types (#15555)
---
.../servicebus/ServiceBusHeaderFilterStrategy.java | 28 ++++++++--
.../ServiceBusHeaderFilterStrategyTest.java | 63 ++++++++++++++++++++++
.../integration/ServiceBusProducerIT.java | 54 ++++++++++---------
3 files changed, 116 insertions(+), 29 deletions(-)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
index 5ab2a97e08a..4ea6a0f0ba6 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
@@ -16,16 +16,36 @@
*/
package org.apache.camel.component.azure.servicebus;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultHeaderFilterStrategy;
public class ServiceBusHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+ private static final Set<Class<?>> SUPPORTED_TYPES = Set.of(
+ Boolean.class,
+ Byte.class,
+ Character.class,
+ Double.class,
+ Float.class,
+ Integer.class,
+ Long.class,
+ Short.class,
+ String.class,
+ Date.class,
+ UUID.class);
+
public ServiceBusHeaderFilterStrategy() {
super();
- initialise();
+
setOutFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
+
setInFilterStartsWith(DefaultHeaderFilterStrategy.CAMEL_FILTER_STARTS_WITH);
}
- private void initialise() {
- setOutFilterStartsWith("Camel", "camel", "org.apache.camel.");
- setInFilterStartsWith("Camel", "camel", "org.apache.camel.");
+ @Override
+ public boolean applyFilterToCamelHeaders(String headerName, Object
headerValue, Exchange exchange) {
+ return headerValue == null ||
!SUPPORTED_TYPES.contains(headerValue.getClass())
+ || super.applyFilterToCamelHeaders(headerName, headerValue,
exchange);
}
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java
new file mode 100644
index 00000000000..d54f22811af
--- /dev/null
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategyTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.servicebus;
+
+import java.util.Date;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ServiceBusHeaderFilterStrategyTest {
+ private final ServiceBusHeaderFilterStrategy headerFilterStrategy = new
ServiceBusHeaderFilterStrategy();
+
+ @ParameterizedTest
+ @ArgumentsSource(HeaderArgumentsProvider.class)
+ void testApplyFilterToCamelHeadersPassesKnownTypes(String headerName,
Object headerValue) {
+ assertThat(headerFilterStrategy.applyFilterToCamelHeaders(headerName,
headerValue, null)).isFalse();
+ }
+
+ @Test
+ void testApplyFilterToCamelHeadersFiltersUnknownType() {
+
assertThat(headerFilterStrategy.applyFilterToCamelHeaders("objectHeader", new
Object(), null)).isTrue();
+ }
+
+ static class HeaderArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream<? extends Arguments> provideArguments(ExtensionContext
context) throws Exception {
+ return Stream.of(
+ Arguments.of("booleanHeader", true),
+ Arguments.of("byteHeader", (byte) 1),
+ Arguments.of("characterHeader", '1'),
+ Arguments.of("doubleHeader", 1.0D),
+ Arguments.of("floatHeader", 1.0F),
+ Arguments.of("integerHeader", 1),
+ Arguments.of("longHeader", 1L),
+ Arguments.of("shortHeader", (short) 1),
+ Arguments.of("stringHeader", "stringHeader"),
+ Arguments.of("timestampHeader", new Date()),
+ Arguments.of("uuidHeader", UUID.randomUUID()));
+ }
+ }
+}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
index bca480746d2..66f01442cfa 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
@@ -17,10 +17,7 @@
package org.apache.camel.component.azure.servicebus.integration;
import java.time.OffsetDateTime;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@@ -45,9 +42,23 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
private static final String DIRECT_SEND_TO_QUEUE_URI =
"direct:sendToQueue";
private static final String DIRECT_SEND_TO_TOPIC_URI =
"direct:sendToTopic";
private static final String DIRECT_SEND_SCHEDULED_URI =
"direct:sendScheduled";
- private static final String PROPAGATED_HEADER_KEY =
"PropagatedCustomHeader";
- private static final String PROPAGATED_HEADER_VALUE = "propagated header
value";
+ private static final Map<String, Object> PROPAGATED_HEADERS = new
HashMap<>();
private static final Pattern MESSAGE_BODY_PATTERN =
Pattern.compile("^message-[0-4]$");
+
+ static {
+ PROPAGATED_HEADERS.put("booleanHeader", true);
+ PROPAGATED_HEADERS.put("byteHeader", (byte) 1);
+ PROPAGATED_HEADERS.put("characterHeader", '1');
+ PROPAGATED_HEADERS.put("doubleHeader", 1.0D);
+ PROPAGATED_HEADERS.put("floatHeader", 1.0F);
+ PROPAGATED_HEADERS.put("integerHeader", 1);
+ PROPAGATED_HEADERS.put("longHeader", 1L);
+ PROPAGATED_HEADERS.put("shortHeader", (short) 1);
+ PROPAGATED_HEADERS.put("stringHeader", "stringHeader");
+ PROPAGATED_HEADERS.put("timestampHeader", new Date());
+ PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID());
+ }
+
private ProducerTemplate producerTemplate;
@BeforeEach
@@ -85,8 +96,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
client.start();
for (int i = 0; i < 5; i++) {
String message = "message-" + i;
- producerTemplate.sendBodyAndHeader(DIRECT_SEND_TO_QUEUE_URI,
message, PROPAGATED_HEADER_KEY,
- PROPAGATED_HEADER_VALUE);
+ producerTemplate.sendBodyAndHeaders(DIRECT_SEND_TO_QUEUE_URI,
message, PROPAGATED_HEADERS);
}
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
@@ -96,8 +106,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
String messageBody = message.getBody().toString();
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
- assertEquals(1, applicationProperties.size());
- assertEquals(PROPAGATED_HEADER_VALUE,
applicationProperties.get(PROPAGATED_HEADER_KEY));
+ assertEquals(PROPAGATED_HEADERS, applicationProperties);
});
}
}
@@ -114,7 +123,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
}
producerTemplate.send(DIRECT_SEND_TO_QUEUE_URI, exchange -> {
- exchange.getIn().setHeader(PROPAGATED_HEADER_KEY,
PROPAGATED_HEADER_VALUE);
+ exchange.getIn().setHeaders(PROPAGATED_HEADERS);
exchange.getIn().setBody(messageBatch);
});
@@ -125,8 +134,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
String messageBody = message.getBody().toString();
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
- assertEquals(1, applicationProperties.size());
- assertEquals(PROPAGATED_HEADER_VALUE,
applicationProperties.get(PROPAGATED_HEADER_KEY));
+ assertEquals(PROPAGATED_HEADERS, applicationProperties);
});
}
}
@@ -138,8 +146,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
client.start();
for (int i = 0; i < 5; i++) {
String message = "message-" + i;
- producerTemplate.sendBodyAndHeader(DIRECT_SEND_TO_TOPIC_URI,
message, PROPAGATED_HEADER_KEY,
- PROPAGATED_HEADER_VALUE);
+ producerTemplate.sendBodyAndHeaders(DIRECT_SEND_TO_TOPIC_URI,
message, PROPAGATED_HEADERS);
}
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
@@ -149,8 +156,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
String messageBody = message.getBody().toString();
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
- assertEquals(1, applicationProperties.size());
- assertEquals(PROPAGATED_HEADER_VALUE,
applicationProperties.get(PROPAGATED_HEADER_KEY));
+ assertEquals(PROPAGATED_HEADERS, applicationProperties);
});
}
}
@@ -162,8 +168,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
client.start();
for (int i = 0; i < 5; i++) {
String message = "message-" + i;
- Map<String, Object> headers = new HashMap<>();
- headers.put(PROPAGATED_HEADER_KEY, PROPAGATED_HEADER_VALUE);
+ Map<String, Object> headers = new
HashMap<>(PROPAGATED_HEADERS);
headers.put(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME,
OffsetDateTime.now().plusSeconds(1));
producerTemplate.sendBodyAndHeaders(DIRECT_SEND_SCHEDULED_URI,
message, headers);
}
@@ -175,8 +180,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
String messageBody = message.getBody().toString();
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
- assertEquals(1, applicationProperties.size());
- assertEquals(PROPAGATED_HEADER_VALUE,
applicationProperties.get(PROPAGATED_HEADER_KEY));
+ assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}
@@ -194,8 +198,9 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
}
producerTemplate.send(DIRECT_SEND_SCHEDULED_URI, exchange -> {
-
exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME,
OffsetDateTime.now().plusSeconds(1));
- exchange.getIn().setHeader(PROPAGATED_HEADER_KEY,
PROPAGATED_HEADER_VALUE);
+ Map<String, Object> headers = new
HashMap<>(PROPAGATED_HEADERS);
+ headers.put(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME,
OffsetDateTime.now().plusSeconds(1));
+ exchange.getIn().setHeaders(headers);
exchange.getIn().setBody(messageBatch);
});
@@ -206,8 +211,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
String messageBody = message.getBody().toString();
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
- assertEquals(1, applicationProperties.size());
- assertEquals(PROPAGATED_HEADER_VALUE,
applicationProperties.get(PROPAGATED_HEADER_KEY));
+ assertEquals(PROPAGATED_HEADERS, applicationProperties);
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}