This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new eedb8c2f821 CAMEL-19417: camel-spring-rabbitmq - convert all message
properties by default (#10325)
eedb8c2f821 is described below
commit eedb8c2f82171b5c67f95630856e7fd43ae59c3c
Author: Steven Dürrenmatt <[email protected]>
AuthorDate: Mon Jun 12 06:42:56 2023 +0200
CAMEL-19417: camel-spring-rabbitmq - convert all message properties by
default (#10325)
* CAMEL-19417: camel-spring-rabbitmq - convert all message properties by
default
* CAMEL-19417: PR review
- replace exchange.getMessage() with local variable
- use labels in metadata
- use Map.of instead of ImmutableMap.Builder
* CAMEL-19417: PR review
- do not remove headers (header filter strategy already applies)
* CAMEL-19417: PR review
- use getHeader(String name, Class<T> type)
* CAMEL-19417: Reformat code
---
.../DefaultMessagePropertiesConverter.java | 121 +++++++++++++++++++--
.../springrabbit/SpringRabbitMQConstants.java | 44 +++++++-
.../integration/RabbitMQConsumerQueuesIT.java | 35 +++++-
.../integration/RabbitMQProducerIT.java | 17 ++-
4 files changed, 199 insertions(+), 18 deletions(-)
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
index 8f72aa5d912..87343d20820 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
@@ -16,14 +16,16 @@
*/
package org.apache.camel.component.springrabbit;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.ExchangeHelper;
+import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
public class DefaultMessagePropertiesConverter implements
MessagePropertiesConverter {
@@ -39,12 +41,66 @@ public class DefaultMessagePropertiesConverter implements
MessagePropertiesConve
@Override
public MessageProperties toMessageProperties(Exchange exchange) {
MessageProperties answer = new MessageProperties();
- String contentType = ExchangeHelper.getContentType(exchange);
+ Message message = exchange.getMessage();
+
+ MessageDeliveryMode deliveryMode =
message.getHeader(SpringRabbitMQConstants.DELIVERY_MODE,
MessageDeliveryMode.class);
+ if (deliveryMode != null) {
+ answer.setDeliveryMode(deliveryMode);
+ }
+ String type = message.getHeader(SpringRabbitMQConstants.TYPE,
String.class);
+ if (type != null) {
+ answer.setType(type);
+ }
+ String contentType =
message.getHeader(SpringRabbitMQConstants.CONTENT_TYPE, String.class);
if (contentType != null) {
answer.setContentType(contentType);
}
+ Long contentLength =
message.getHeader(SpringRabbitMQConstants.CONTENT_LENGTH, Long.class);
+ if (contentLength != null) {
+ answer.setContentLength(contentLength);
+ }
+ String contentEncoding =
message.getHeader(SpringRabbitMQConstants.CONTENT_ENCODING, String.class);
+ if (contentEncoding != null) {
+ answer.setContentEncoding(contentEncoding);
+ }
+ String messageId =
message.getHeader(SpringRabbitMQConstants.MESSAGE_ID, String.class);
+ if (messageId != null) {
+ answer.setMessageId(messageId);
+ }
+ String correlationId =
message.getHeader(SpringRabbitMQConstants.CORRELATION_ID, String.class);
+ if (correlationId != null) {
+ answer.setCorrelationId(correlationId);
+ }
+ String replyTo = message.getHeader(SpringRabbitMQConstants.REPLY_TO,
String.class);
+ if (replyTo != null) {
+ answer.setReplyTo(replyTo);
+ }
+ String expiration =
message.getHeader(SpringRabbitMQConstants.EXPIRATION, String.class);
+ if (expiration != null) {
+ answer.setExpiration(expiration);
+ }
+ Date timestamp = message.getHeader(SpringRabbitMQConstants.TIMESTAMP,
Date.class);
+ if (timestamp != null) {
+ answer.setTimestamp(timestamp);
+ }
+ String userId = message.getHeader(SpringRabbitMQConstants.USER_ID,
String.class);
+ if (userId != null) {
+ answer.setUserId(userId);
+ }
+ String appId = message.getHeader(SpringRabbitMQConstants.APP_ID,
String.class);
+ if (appId != null) {
+ answer.setAppId(appId);
+ }
+ Integer priority = message.getHeader(SpringRabbitMQConstants.PRIORITY,
Integer.class);
+ if (priority != null) {
+ answer.setPriority(priority);
+ }
+ String clusterId =
message.getHeader(SpringRabbitMQConstants.CLUSTER_ID, String.class);
+ if (clusterId != null) {
+ answer.setClusterId(clusterId);
+ }
- Set<Map.Entry<String, Object>> entries =
exchange.getMessage().getHeaders().entrySet();
+ Set<Map.Entry<String, Object>> entries =
message.getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
String headerName = entry.getKey();
Object headerValue = entry.getValue();
@@ -65,15 +121,61 @@ public class DefaultMessagePropertiesConverter implements
MessagePropertiesConve
Object headerValue = entry.getValue();
appendInputHeader(answer, headerName, headerValue, exchange);
}
+
+ if (messageProperties.getRedelivered() != null) {
+ answer.put(SpringRabbitMQConstants.REDELIVERED,
messageProperties.getRedelivered());
+ }
+ if (messageProperties.getDeliveryTag() > 0) {
+ answer.put(SpringRabbitMQConstants.DELIVERY_TAG,
messageProperties.getDeliveryTag());
+ }
+ if (messageProperties.getReceivedExchange() != null) {
+ answer.put(SpringRabbitMQConstants.EXCHANGE_NAME,
messageProperties.getReceivedExchange());
+ }
+ if (messageProperties.getReceivedRoutingKey() != null) {
+ answer.put(SpringRabbitMQConstants.ROUTING_KEY,
messageProperties.getReceivedRoutingKey());
+ }
+ if (messageProperties.getReceivedDeliveryMode() != null) {
+ answer.put(SpringRabbitMQConstants.DELIVERY_MODE,
messageProperties.getReceivedDeliveryMode());
+ }
+ if (messageProperties.getType() != null) {
+ answer.put(SpringRabbitMQConstants.TYPE,
messageProperties.getType());
+ }
if (messageProperties.getContentType() != null) {
- answer.put(Exchange.CONTENT_TYPE,
messageProperties.getContentType());
+ answer.put(SpringRabbitMQConstants.CONTENT_TYPE,
messageProperties.getContentType());
+ }
+ if (messageProperties.getContentLength() > 0) {
+ answer.put(SpringRabbitMQConstants.CONTENT_LENGTH,
messageProperties.getContentLength());
+ }
+ if (messageProperties.getContentEncoding() != null) {
+ answer.put(SpringRabbitMQConstants.CONTENT_ENCODING,
messageProperties.getContentEncoding());
+ }
+ if (messageProperties.getMessageId() != null) {
+ answer.put(SpringRabbitMQConstants.MESSAGE_ID,
messageProperties.getMessageId());
+ }
+ if (messageProperties.getCorrelationId() != null) {
+ answer.put(SpringRabbitMQConstants.CORRELATION_ID,
messageProperties.getCorrelationId());
+ }
+ if (messageProperties.getReplyTo() != null) {
+ answer.put(SpringRabbitMQConstants.REPLY_TO,
messageProperties.getReplyTo());
+ }
+ if (messageProperties.getExpiration() != null) {
+ answer.put(SpringRabbitMQConstants.EXPIRATION,
messageProperties.getExpiration());
}
if (messageProperties.getTimestamp() != null) {
- answer.put(Exchange.MESSAGE_TIMESTAMP,
messageProperties.getTimestamp().getTime());
+ answer.put(SpringRabbitMQConstants.TIMESTAMP,
messageProperties.getTimestamp());
+ }
+ if (messageProperties.getReceivedUserId() != null) {
+ answer.put(SpringRabbitMQConstants.USER_ID,
messageProperties.getReceivedUserId());
+ }
+ if (messageProperties.getAppId() != null) {
+ answer.put(SpringRabbitMQConstants.APP_ID,
messageProperties.getAppId());
+ }
+ if (messageProperties.getPriority() != null) {
+ answer.put(SpringRabbitMQConstants.PRIORITY,
messageProperties.getPriority());
+ }
+ if (messageProperties.getClusterId() != null) {
+ answer.put(SpringRabbitMQConstants.CLUSTER_ID,
messageProperties.getClusterId());
}
-
- // Helps in getting to acknowledge manually
- answer.put(SpringRabbitMQConstants.DELIVERY_TAG,
messageProperties.getDeliveryTag());
}
return answer;
@@ -86,7 +188,7 @@ public class DefaultMessagePropertiesConverter implements
MessagePropertiesConve
}
private void appendInputHeader(Map<String, Object> answer, String
headerName, Object headerValue, Exchange ex) {
- if (shouldOutputHeader(headerName, headerValue, ex)) {
+ if (shouldInputHeader(headerName, headerValue, ex)) {
answer.put(headerName, headerValue);
}
}
@@ -106,5 +208,4 @@ public class DefaultMessagePropertiesConverter implements
MessagePropertiesConve
return headerFilterStrategy == null
||
!headerFilterStrategy.applyFilterToExternalHeaders(headerName, headerValue,
exchange);
}
-
}
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
index 4518c39c3e8..865951c3dad 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
@@ -23,12 +23,50 @@ public final class SpringRabbitMQConstants {
public static final String DEFAULT_EXCHANGE_NAME = "default";
public static final String CHANNEL = "CamelSpringRabbitmqChannel";
- @Metadata(description = "The exchange key.", javaType = "String")
+ @Metadata(description = "To override the endpoint configuration's routing
key.", javaType = "String", label = "producer")
public static final String ROUTING_OVERRIDE_KEY =
"CamelSpringRabbitmqRoutingOverrideKey";
- @Metadata(description = "The exchange name.", javaType = "String")
+ @Metadata(description = "To override the endpoint configuration's exchange
name.", javaType = "String", label = "producer")
public static final String EXCHANGE_OVERRIDE_NAME =
"CamelSpringRabbitmqExchangeOverrideName";
- @Metadata(description = "Delivery tag for manual acknowledge mode",
javaType = "long")
+ @Metadata(description = "Whether the message was previously delivered and
requeued.", javaType = "Boolean",
+ label = "consumer")
+ public static final String REDELIVERED = "CamelSpringRabbitmqRedelivered";
+ @Metadata(description = "Delivery tag for manual acknowledge mode.",
javaType = "long", label = "consumer")
public static final String DELIVERY_TAG = "CamelSpringRabbitmqDeliveryTag";
+ @Metadata(description = "The exchange name that was used when publishing
the message.", javaType = "String",
+ label = "consumer")
+ public static final String EXCHANGE_NAME =
"CamelSpringRabbitmqExchangeName";
+ @Metadata(description = "The routing key that was used when publishing the
message.", javaType = "String",
+ label = "consumer")
+ public static final String ROUTING_KEY = "CamelSpringRabbitmqRoutingKey";
+ @Metadata(description = "The message delivery mode.", javaType =
"MessageDeliveryMode", enums = "NON_PERSISTENT,PERSISTENT")
+ public static final String DELIVERY_MODE =
"CamelSpringRabbitmqDeliveryMode";
+ @Metadata(description = "Application-specific message type.", javaType =
"String")
+ public static final String TYPE = "CamelSpringRabbitmqType";
+ @Metadata(description = "The message content type.", javaType = "String")
+ public static final String CONTENT_TYPE = "CamelSpringRabbitmqContentType";
+ @Metadata(description = "The message content length.", javaType = "long")
+ public static final String CONTENT_LENGTH =
"CamelSpringRabbitmqContentLength";
+ @Metadata(description = "Content encoding used by applications.", javaType
= "String")
+ public static final String CONTENT_ENCODING =
"CamelSpringRabbitmqContentEncoding";
+ @Metadata(description = "Arbitrary message id.", javaType = "String")
+ public static final String MESSAGE_ID = "CamelSpringRabbitmqMessageId";
+ @Metadata(description = "Identifier to correlate RPC responses with
requests.", javaType = "String")
+ public static final String CORRELATION_ID =
"CamelSpringRabbitmqCorrelationId";
+ @Metadata(description = "Commonly used to name a callback queue.",
javaType = "String")
+ public static final String REPLY_TO = "CamelSpringRabbitmqReplyTo";
+ @Metadata(description = "Per-message TTL.", javaType = "String")
+ public static final String EXPIRATION = "CamelSpringRabbitmqExpiration";
+ @Metadata(description = "Application-provided timestamp.", javaType =
"Date")
+ public static final String TIMESTAMP = "CamelSpringRabbitmqTimestamp";
+ @Metadata(description = "Validated user id.", javaType = "String")
+ public static final String USER_ID = "CamelSpringRabbitmqUserId";
+ @Metadata(description = "The application name.", javaType = "String")
+ public static final String APP_ID = "CamelSpringRabbitmqAppId";
+ @Metadata(description = "The message priority.", javaType = "Integer")
+ public static final String PRIORITY = "CamelSpringRabbitmqPriority";
+ @Metadata(description = "The cluster id.", javaType = "String")
+ public static final String CLUSTER_ID = "CamelSpringRabbitmqClusterId";
+
public static final String DIRECT_MESSAGE_LISTENER_CONTAINER = "DMLC";
public static final String SIMPLE_MESSAGE_LISTENER_CONTAINER = "SMLC";
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
index 21c5f718cd3..4cfa47e1095 100644
---
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
@@ -18,13 +18,14 @@ package org.apache.camel.component.springrabbit.integration;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
@@ -62,7 +63,37 @@ public class RabbitMQConsumerQueuesIT extends
RabbitMQITSupport {
getMockEndpoint("mock:result").expectedBodiesReceived("foo");
getMockEndpoint("mock:result").expectedHeaderReceived("bar", "baz");
-
getMockEndpoint("mock:result").expectedHeaderReceived(Exchange.CONTENT_TYPE,
MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.CONTENT_TYPE,
+ MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+
+ template.sendBody("direct:start", body);
+
+ MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testConsumerWithMessageProperties() throws Exception {
+ MessageProperties props = MessagePropertiesBuilder.newInstance()
+ .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
+ .setType("price")
+ .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
+ .setMessageId("123")
+ .setPriority(1)
+ .setHeader("bar", "baz")
+ .build();
+ Message body = MessageBuilder.withBody("foo".getBytes())
+ .andProperties(props)
+ .build();
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("foo");
+ getMockEndpoint("mock:result").expectedHeaderReceived("bar", "baz");
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.DELIVERY_MODE,
+ MessageDeliveryMode.PERSISTENT);
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.TYPE,
"price");
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.CONTENT_TYPE,
+ MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.MESSAGE_ID,
"123");
+
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.PRIORITY,
1);
template.sendBody("direct:start", body);
diff --git
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
index e8b3f41fdf8..878e95d20e9 100644
---
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
+++
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
@@ -17,10 +17,11 @@
package org.apache.camel.component.springrabbit.integration;
import java.nio.charset.Charset;
+import java.util.Map;
-import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
@@ -28,6 +29,7 @@ import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.core.Queue;
@@ -107,7 +109,7 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
}
@Test
- public void testProducerContentType() throws Exception {
+ public void testProducerWithMessageProperties() throws Exception {
ConnectionFactory cf =
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
Queue q = new Queue("myqueue");
@@ -118,7 +120,12 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
admin.declareExchange(t);
admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
- template.sendBodyAndHeader("direct:start", "<price>123</price>",
Exchange.CONTENT_TYPE, "application/xml");
+ template.sendBodyAndHeaders("direct:start", "<price>123</price>",
+ Map.of(SpringRabbitMQConstants.DELIVERY_MODE,
MessageDeliveryMode.PERSISTENT,
+ SpringRabbitMQConstants.TYPE, "price",
+ SpringRabbitMQConstants.CONTENT_TYPE,
"application/xml",
+ SpringRabbitMQConstants.MESSAGE_ID,
"0fe9c142-f9c1-426f-9237-f5a4c988a8ae",
+ SpringRabbitMQConstants.PRIORITY, 1));
AmqpTemplate template = new RabbitTemplate(cf);
Message out = template.receive("myqueue");
@@ -126,7 +133,11 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
String encoding = out.getMessageProperties().getContentEncoding();
Assertions.assertEquals(Charset.defaultCharset().name(), encoding);
Assertions.assertEquals("<price>123</price>", new
String(out.getBody(), encoding));
+ Assertions.assertEquals(MessageDeliveryMode.PERSISTENT,
out.getMessageProperties().getReceivedDeliveryMode());
+ Assertions.assertEquals("price", out.getMessageProperties().getType());
Assertions.assertEquals("application/xml",
out.getMessageProperties().getContentType());
+ Assertions.assertEquals("0fe9c142-f9c1-426f-9237-f5a4c988a8ae",
out.getMessageProperties().getMessageId());
+ Assertions.assertEquals(1, out.getMessageProperties().getPriority());
Assertions.assertEquals(0,
out.getMessageProperties().getHeaders().size());
}