This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2ce75ebb34dcb74c9bf5573108a7435e41f9b306 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jun 8 12:06:12 2021 +0200 CAMEL-16627: camel-core - Add common header for source timestamp --- .../apache/camel/component/file/GenericFile.java | 1 + .../org/apache/camel/component/jms/JmsMessage.java | 5 ++ .../component/jms/JmsMessageTimestampTest.java | 63 ++++++++++++++++++++++ .../camel/component/kafka/KafkaConsumer.java | 1 + .../apache/camel/component/sjms/SjmsMessage.java | 5 ++ .../component/sjms/JmsMessageTimestampTest.java | 46 ++++++++++++++++ .../component/vertx/kafka/VertxKafkaConsumer.java | 1 + .../org/apache/camel/ExchangeConstantProvider.java | 3 +- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../src/main/java/org/apache/camel/Message.java | 10 ++++ .../file/FileConsumerMessageTimestampTest.java | 48 +++++++++++++++++ .../org/apache/camel/impl/MessageSupportTest.java | 13 +++++ .../org/apache/camel/support/MessageSupport.java | 10 ++++ 13 files changed, 206 insertions(+), 1 deletion(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java index 928f548..735e66f 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java @@ -187,6 +187,7 @@ public class GenericFile<T> implements WrappedFile<T> { } if (getLastModified() > 0) { message.setHeader(Exchange.FILE_LAST_MODIFIED, getLastModified()); + message.setHeader(Exchange.MESSAGE_TIMESTAMP, getLastModified()); } } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java index c815e52..71169bb 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java @@ -240,6 +240,11 @@ public class JmsMessage extends DefaultMessage { protected void populateInitialHeaders(Map<String, Object> map) { if (jmsMessage != null && map != null) { map.putAll(getBinding().extractHeadersFromJms(jmsMessage, getExchange())); + try { + map.put(Exchange.MESSAGE_TIMESTAMP, jmsMessage.getJMSTimestamp()); + } catch (JMSException e) { + // ignore + } } } diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java new file mode 100644 index 0000000..6a7ef60 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import javax.jms.ConnectionFactory; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +public class JmsMessageTimestampTest extends CamelTestSupport { + + protected String componentName = "activemq"; + + @Test + public void testMessageTimestamp() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0); + + template.sendBody("activemq:queue:hello", "Hello World"); + + result.assertIsSatisfied(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent(componentName, jmsComponentAutoAcknowledge(connectionFactory)); + + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("activemq:queue:hello").to("mock:result"); + } + }; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index ec1a8dc..c9217f4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -660,6 +660,7 @@ public class KafkaConsumer extends DefaultConsumer { message.setHeader(KafkaConstants.OFFSET, record.offset()); message.setHeader(KafkaConstants.HEADERS, record.headers()); message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp()); + message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp()); if (record.key() != null) { message.setHeader(KafkaConstants.KEY, record.key()); } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java index e683021..1f2bee4 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java @@ -258,6 +258,11 @@ public class SjmsMessage extends DefaultMessage { protected void populateInitialHeaders(Map<String, Object> map) { if (jmsMessage != null && map != null) { map.putAll(getBinding().extractHeadersFromJms(jmsMessage, getExchange())); + try { + map.put(Exchange.MESSAGE_TIMESTAMP, jmsMessage.getJMSTimestamp()); + } catch (JMSException e) { + // ignore + } } } diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java new file mode 100644 index 0000000..a249ca5 --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.jupiter.api.Test; + +public class JmsMessageTimestampTest extends JmsTestSupport { + + @Test + public void testMessageTimestamp() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(1); + result.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0); + + template.sendBody("sjms:queue:hello", "Hello World"); + + result.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("sjms:queue:hello").to("mock:result"); + } + }; + } +} diff --git a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java index b1b79aa..45c292d 100644 --- a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java +++ b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java @@ -151,6 +151,7 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable { message.setHeader(VertxKafkaConstants.OFFSET, record.offset()); message.setHeader(VertxKafkaConstants.HEADERS, record.headers()); message.setHeader(VertxKafkaConstants.TIMESTAMP, record.timestamp()); + message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp()); message.setHeader(VertxKafkaConstants.MESSAGE_KEY, record.key()); // set headers for the manual offsets commit diff --git a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java index 0c541a8..6f9b37b 100644 --- a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java +++ b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java @@ -11,7 +11,7 @@ public class ExchangeConstantProvider { private static final Map<String, String> MAP; static { - Map<String, String> map = new HashMap<>(150); + Map<String, String> map = new HashMap<>(151); map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType"); map.put("AGGREGATED_COLLECTION_GUARD", "CamelAggregatedCollectionGuard"); map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy"); @@ -109,6 +109,7 @@ public class ExchangeConstantProvider { map.put("MESSAGE_HISTORY", "CamelMessageHistory"); map.put("MESSAGE_HISTORY_HEADER_FORMAT", "CamelMessageHistoryHeaderFormat"); map.put("MESSAGE_HISTORY_OUTPUT_FORMAT", "CamelMessageHistoryOutputFormat"); + map.put("MESSAGE_TIMESTAMP", "CamelMessageTimestamp"); map.put("MULTICAST_COMPLETE", "CamelMulticastComplete"); map.put("MULTICAST_INDEX", "CamelMulticastIndex"); map.put("NOTIFY_EVENT", "CamelNotifyEvent"); diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java b/core/camel-api/src/main/java/org/apache/camel/Exchange.java index a263a03..2b9c237 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java @@ -184,6 +184,7 @@ public interface Exchange { String MESSAGE_HISTORY = "CamelMessageHistory"; String MESSAGE_HISTORY_HEADER_FORMAT = "CamelMessageHistoryHeaderFormat"; String MESSAGE_HISTORY_OUTPUT_FORMAT = "CamelMessageHistoryOutputFormat"; + String MESSAGE_TIMESTAMP = "CamelMessageTimestamp"; String MULTICAST_INDEX = "CamelMulticastIndex"; String MULTICAST_COMPLETE = "CamelMulticastComplete"; diff --git a/core/camel-api/src/main/java/org/apache/camel/Message.java b/core/camel-api/src/main/java/org/apache/camel/Message.java index a2db06f..53407e2 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Message.java +++ b/core/camel-api/src/main/java/org/apache/camel/Message.java @@ -53,6 +53,16 @@ public interface Message { String getMessageId(); /** + * Returns the timestamp that this messages originates from. + * <p/> + * Some systems like JMS, Kafka, AWS have a timestamp on the event/message, that Camel received. This method returns + * the timestamp, if a timestamp exists. + * + * @return the timestamp, or <tt>0</tt> if the message has no source timestamp. + */ + long getMessageTimestamp(); + + /** * Sets the id of the message * * @param messageId id of the message diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java new file mode 100644 index 0000000..a7350b1 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +public class FileConsumerMessageTimestampTest extends ContextTestSupport { + + @Test + public void testMessageTimestamp() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0L); + + template.sendBodyAndHeader(fileUri(), "Hello World", Exchange.FILE_NAME, "hello.txt"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(fileUri("?noop=true&initialDelay=0&delay=10")).convertBodyTo(String.class) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java index 6a20553..273fa82 100644 --- a/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java @@ -117,4 +117,17 @@ public class MessageSupportTest extends ContextTestSupport { assertSame(exchange, three.getExchange()); } + @Test + public void testNoMessageTimestamp() throws Exception { + Exchange exchange = new DefaultExchange(context); + assertEquals(0L, exchange.getMessage().getMessageTimestamp()); + } + + @Test + public void testMessageTimestamp() throws Exception { + Exchange exchange = new DefaultExchange(context); + exchange.getMessage().setHeader(Exchange.MESSAGE_TIMESTAMP, 1234L); + assertEquals(1234L, exchange.getMessage().getMessageTimestamp()); + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java index d07a3f7..2148c5a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java @@ -39,6 +39,7 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data private Exchange exchange; private Object body; private String messageId; + private long messageTimestamp; private DataType dataType; @Override @@ -279,6 +280,15 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data } @Override + public long getMessageTimestamp() { + if (messageTimestamp == 0) { + // use -1 to indicate no timestamp exists + messageTimestamp = getHeader(Exchange.MESSAGE_TIMESTAMP, -1L, long.class); + } + return messageTimestamp <= 0 ? 0 : messageTimestamp; + } + + @Override public void setMessageId(String messageId) { this.messageId = messageId; }
