This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0d9d49e45ec36a79ff872f16b9a657528f8e792b Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jun 9 10:50:23 2021 +0200 CAMEL-16627: camel-core - Add common header for source timestamp --- .../org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java | 4 ++++ .../camel/component/azure/storage/blob/BlobExchangeHeaders.java | 4 ++++ .../component/azure/storage/datalake/DataLakeExchangeHeaders.java | 4 ++++ .../camel/component/azure/storage/queue/QueueExchangeHeaders.java | 4 ++++ 4 files changed, 16 insertions(+) diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index 0bb6c84..c9171e6 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -85,6 +85,10 @@ public class EventHubsConsumer extends DefaultConsumer { message.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset()); message.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime()); message.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber()); + if (eventContext.getEventData().getEnqueuedTime() != null) { + long ts = eventContext.getEventData().getEnqueuedTime().getEpochSecond() * 1000; + message.setHeader(Exchange.MESSAGE_TIMESTAMP, ts); + } return exchange; } diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java index 8faed08..35b73c4 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobExchangeHeaders.java @@ -393,6 +393,10 @@ public class BlobExchangeHeaders { public BlobExchangeHeaders lastModified(final OffsetDateTime offsetDateTime) { headers.put(BlobConstants.LAST_MODIFIED, offsetDateTime); + if (offsetDateTime != null) { + long ts = offsetDateTime.toEpochSecond() * 1000; + headers.put(Exchange.MESSAGE_TIMESTAMP, ts); + } return this; } diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java index 32bbde3..591e346 100644 --- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java +++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java @@ -368,6 +368,10 @@ public class DataLakeExchangeHeaders { public DataLakeExchangeHeaders lastModified(final OffsetDateTime lastModified) { headers.put(DataLakeConstants.LAST_MODIFIED, lastModified); + if (lastModified != null) { + long ts = lastModified.toEpochSecond() * 1000; + headers.put(Exchange.MESSAGE_TIMESTAMP, ts); + } return this; } diff --git a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueExchangeHeaders.java b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueExchangeHeaders.java index 3f5a36f..7e18104 100644 --- a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueExchangeHeaders.java +++ b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueExchangeHeaders.java @@ -110,6 +110,10 @@ public class QueueExchangeHeaders { public QueueExchangeHeaders insertionTime(final OffsetDateTime insertionTime) { headers.put(QueueConstants.INSERTION_TIME, insertionTime); + if (insertionTime != null) { + long ts = insertionTime.toEpochSecond() * 1000; + headers.put(Exchange.MESSAGE_TIMESTAMP, ts); + } return this; }
