This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2ce75ebb34dcb74c9bf5573108a7435e41f9b306
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jun 8 12:06:12 2021 +0200

    CAMEL-16627: camel-core - Add common header for source timestamp
---
 .../apache/camel/component/file/GenericFile.java   |  1 +
 .../org/apache/camel/component/jms/JmsMessage.java |  5 ++
 .../component/jms/JmsMessageTimestampTest.java     | 63 ++++++++++++++++++++++
 .../camel/component/kafka/KafkaConsumer.java       |  1 +
 .../apache/camel/component/sjms/SjmsMessage.java   |  5 ++
 .../component/sjms/JmsMessageTimestampTest.java    | 46 ++++++++++++++++
 .../component/vertx/kafka/VertxKafkaConsumer.java  |  1 +
 .../org/apache/camel/ExchangeConstantProvider.java |  3 +-
 .../src/main/java/org/apache/camel/Exchange.java   |  1 +
 .../src/main/java/org/apache/camel/Message.java    | 10 ++++
 .../file/FileConsumerMessageTimestampTest.java     | 48 +++++++++++++++++
 .../org/apache/camel/impl/MessageSupportTest.java  | 13 +++++
 .../org/apache/camel/support/MessageSupport.java   | 10 ++++
 13 files changed, 206 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index 928f548..735e66f 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -187,6 +187,7 @@ public class GenericFile<T> implements WrappedFile<T> {
             }
             if (getLastModified() > 0) {
                 message.setHeader(Exchange.FILE_LAST_MODIFIED, 
getLastModified());
+                message.setHeader(Exchange.MESSAGE_TIMESTAMP, 
getLastModified());
             }
         }
     }
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
index c815e52..71169bb 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
@@ -240,6 +240,11 @@ public class JmsMessage extends DefaultMessage {
     protected void populateInitialHeaders(Map<String, Object> map) {
         if (jmsMessage != null && map != null) {
             map.putAll(getBinding().extractHeadersFromJms(jmsMessage, 
getExchange()));
+            try {
+                map.put(Exchange.MESSAGE_TIMESTAMP, 
jmsMessage.getJMSTimestamp());
+            } catch (JMSException e) {
+                // ignore
+            }
         }
     }
 
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java
new file mode 100644
index 0000000..6a7ef60
--- /dev/null
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsMessageTimestampTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import javax.jms.ConnectionFactory;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+public class JmsMessageTimestampTest extends CamelTestSupport {
+
+    protected String componentName = "activemq";
+
+    @Test
+    public void testMessageTimestamp() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        result.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0);
+
+        template.sendBody("activemq:queue:hello", "Hello World");
+
+        result.assertIsSatisfied();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent(componentName, 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("activemq:queue:hello").to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index ec1a8dc..c9217f4 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -660,6 +660,7 @@ public class KafkaConsumer extends DefaultConsumer {
         message.setHeader(KafkaConstants.OFFSET, record.offset());
         message.setHeader(KafkaConstants.HEADERS, record.headers());
         message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
+        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
         if (record.key() != null) {
             message.setHeader(KafkaConstants.KEY, record.key());
         }
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
index e683021..1f2bee4 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
@@ -258,6 +258,11 @@ public class SjmsMessage extends DefaultMessage {
     protected void populateInitialHeaders(Map<String, Object> map) {
         if (jmsMessage != null && map != null) {
             map.putAll(getBinding().extractHeadersFromJms(jmsMessage, 
getExchange()));
+            try {
+                map.put(Exchange.MESSAGE_TIMESTAMP, 
jmsMessage.getJMSTimestamp());
+            } catch (JMSException e) {
+                // ignore
+            }
         }
     }
 
diff --git 
a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java
 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java
new file mode 100644
index 0000000..a249ca5
--- /dev/null
+++ 
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/JmsMessageTimestampTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sjms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.junit.jupiter.api.Test;
+
+public class JmsMessageTimestampTest extends JmsTestSupport {
+
+    @Test
+    public void testMessageTimestamp() throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(1);
+        result.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0);
+
+        template.sendBody("sjms:queue:hello", "Hello World");
+
+        result.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("sjms:queue:hello").to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
 
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
index b1b79aa..45c292d 100644
--- 
a/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
+++ 
b/components/camel-vertx/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
@@ -151,6 +151,7 @@ public class VertxKafkaConsumer extends DefaultConsumer 
implements Suspendable {
         message.setHeader(VertxKafkaConstants.OFFSET, record.offset());
         message.setHeader(VertxKafkaConstants.HEADERS, record.headers());
         message.setHeader(VertxKafkaConstants.TIMESTAMP, record.timestamp());
+        message.setHeader(Exchange.MESSAGE_TIMESTAMP, record.timestamp());
         message.setHeader(VertxKafkaConstants.MESSAGE_KEY, record.key());
 
         // set headers for the manual offsets commit
diff --git 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
index 0c541a8..6f9b37b 100644
--- 
a/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
+++ 
b/core/camel-api/src/generated/java/org/apache/camel/ExchangeConstantProvider.java
@@ -11,7 +11,7 @@ public class ExchangeConstantProvider {
 
     private static final Map<String, String> MAP;
     static {
-        Map<String, String> map = new HashMap<>(150);
+        Map<String, String> map = new HashMap<>(151);
         map.put("ACCEPT_CONTENT_TYPE", "CamelAcceptContentType");
         map.put("AGGREGATED_COLLECTION_GUARD", 
"CamelAggregatedCollectionGuard");
         map.put("AGGREGATED_COMPLETED_BY", "CamelAggregatedCompletedBy");
@@ -109,6 +109,7 @@ public class ExchangeConstantProvider {
         map.put("MESSAGE_HISTORY", "CamelMessageHistory");
         map.put("MESSAGE_HISTORY_HEADER_FORMAT", 
"CamelMessageHistoryHeaderFormat");
         map.put("MESSAGE_HISTORY_OUTPUT_FORMAT", 
"CamelMessageHistoryOutputFormat");
+        map.put("MESSAGE_TIMESTAMP", "CamelMessageTimestamp");
         map.put("MULTICAST_COMPLETE", "CamelMulticastComplete");
         map.put("MULTICAST_INDEX", "CamelMulticastIndex");
         map.put("NOTIFY_EVENT", "CamelNotifyEvent");
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java 
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index a263a03..2b9c237 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -184,6 +184,7 @@ public interface Exchange {
     String MESSAGE_HISTORY = "CamelMessageHistory";
     String MESSAGE_HISTORY_HEADER_FORMAT = "CamelMessageHistoryHeaderFormat";
     String MESSAGE_HISTORY_OUTPUT_FORMAT = "CamelMessageHistoryOutputFormat";
+    String MESSAGE_TIMESTAMP = "CamelMessageTimestamp";
     String MULTICAST_INDEX = "CamelMulticastIndex";
     String MULTICAST_COMPLETE = "CamelMulticastComplete";
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/Message.java 
b/core/camel-api/src/main/java/org/apache/camel/Message.java
index a2db06f..53407e2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Message.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Message.java
@@ -53,6 +53,16 @@ public interface Message {
     String getMessageId();
 
     /**
+     * Returns the timestamp that this messages originates from.
+     * <p/>
+     * Some systems like JMS, Kafka, AWS have a timestamp on the 
event/message, that Camel received. This method returns
+     * the timestamp, if a timestamp exists.
+     *
+     * @return the timestamp, or <tt>0</tt> if the message has no source 
timestamp.
+     */
+    long getMessageTimestamp();
+
+    /**
      * Sets the id of the message
      *
      * @param messageId id of the message
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java
new file mode 100644
index 0000000..a7350b1
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerMessageTimestampTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+public class FileConsumerMessageTimestampTest extends ContextTestSupport {
+
+    @Test
+    public void testMessageTimestamp() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.message(0).header(Exchange.MESSAGE_TIMESTAMP).isGreaterThan(0L);
+
+        template.sendBodyAndHeader(fileUri(), "Hello World", 
Exchange.FILE_NAME, "hello.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from(fileUri("?noop=true&initialDelay=0&delay=10")).convertBodyTo(String.class)
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java 
b/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java
index 6a20553..273fa82 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/MessageSupportTest.java
@@ -117,4 +117,17 @@ public class MessageSupportTest extends ContextTestSupport 
{
         assertSame(exchange, three.getExchange());
     }
 
+    @Test
+    public void testNoMessageTimestamp() throws Exception {
+        Exchange exchange = new DefaultExchange(context);
+        assertEquals(0L, exchange.getMessage().getMessageTimestamp());
+    }
+
+    @Test
+    public void testMessageTimestamp() throws Exception {
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getMessage().setHeader(Exchange.MESSAGE_TIMESTAMP, 1234L);
+        assertEquals(1234L, exchange.getMessage().getMessageTimestamp());
+    }
+
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java 
b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
index d07a3f7..2148c5a 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
@@ -39,6 +39,7 @@ public abstract class MessageSupport implements Message, 
CamelContextAware, Data
     private Exchange exchange;
     private Object body;
     private String messageId;
+    private long messageTimestamp;
     private DataType dataType;
 
     @Override
@@ -279,6 +280,15 @@ public abstract class MessageSupport implements Message, 
CamelContextAware, Data
     }
 
     @Override
+    public long getMessageTimestamp() {
+        if (messageTimestamp == 0) {
+            // use -1 to indicate no timestamp exists
+            messageTimestamp = getHeader(Exchange.MESSAGE_TIMESTAMP, -1L, 
long.class);
+        }
+        return messageTimestamp <= 0 ? 0 : messageTimestamp;
+    }
+
+    @Override
     public void setMessageId(String messageId) {
         this.messageId = messageId;
     }

Reply via email to