This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ae5547  Updated TimestampRouter bean to library
1ae5547 is described below

commit 1ae5547ca00994ff239ed19d56a0d595f1dff879
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon May 17 16:27:14 2021 +0200

    Updated TimestampRouter bean to library
---
 .../utils/transform/kafka/TimestampRouter.java     | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
index 6ea2066..c7f6447 100644
--- 
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
+++ 
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
@@ -22,6 +22,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.util.ObjectHelper;
 
 import java.text.SimpleDateFormat;
+import java.time.Instant;
 import java.util.Date;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
@@ -29,7 +30,7 @@ import java.util.regex.Pattern;
 
 public class TimestampRouter {
 
-    public void process(@ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat, Exchange ex) {
+    public void process(@ExchangeProperty("topicFormat") String topicFormat, 
@ExchangeProperty("timestampFormat") String timestampFormat, 
@ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange 
ex) {
         final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
 
         final Pattern TIMESTAMP = Pattern.compile("$[timestamp]", 
Pattern.LITERAL);
@@ -37,12 +38,28 @@ public class TimestampRouter {
         final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
         fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
 
+        long timestamp;
         String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, 
String.class);
-        long timestamp = ex.getMessage().getHeader(KafkaConstants.TIMESTAMP, 
Long.class);
-        if (ObjectHelper.isNotEmpty(topicName) && 
ObjectHelper.isNotEmpty(timestamp)) {
+        Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
+        if (rawTimestamp instanceof Long) {
+            timestamp = (Long) rawTimestamp;
+        } else if (rawTimestamp instanceof Instant) {
+            timestamp = ((Instant) rawTimestamp).toEpochMilli();
+        } else {
+            timestamp = (Long) rawTimestamp;
+        }
+        if (ObjectHelper.isNotEmpty(timestamp)) {
             final String formattedTimestamp = fmt.format(new Date(timestamp));
-            final String replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
-            final String updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            String replace1;
+            String updatedTopic;
+
+            if (ObjectHelper.isNotEmpty(topicName)) {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            } else {
+                replace1 = 
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+                updatedTopic = 
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+            }
             ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC, 
updatedTopic);
         }
     }

Reply via email to