This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push:
new 1ae5547 Updated TimestampRouter bean to library
1ae5547 is described below
commit 1ae5547ca00994ff239ed19d56a0d595f1dff879
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon May 17 16:27:14 2021 +0200
Updated TimestampRouter bean to library
---
.../utils/transform/kafka/TimestampRouter.java | 27 ++++++++++++++++++----
1 file changed, 22 insertions(+), 5 deletions(-)
diff --git
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
index 6ea2066..c7f6447 100644
---
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
+++
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/kafka/TimestampRouter.java
@@ -22,6 +22,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;
import java.text.SimpleDateFormat;
+import java.time.Instant;
import java.util.Date;
import java.util.TimeZone;
import java.util.regex.Matcher;
@@ -29,7 +30,7 @@ import java.util.regex.Pattern;
public class TimestampRouter {
- public void process(@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat, Exchange ex) {
+ public void process(@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
@ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange
ex) {
final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
final Pattern TIMESTAMP = Pattern.compile("$[timestamp]",
Pattern.LITERAL);
@@ -37,12 +38,28 @@ public class TimestampRouter {
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+ long timestamp;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
- long timestamp = ex.getMessage().getHeader(KafkaConstants.TIMESTAMP,
Long.class);
- if (ObjectHelper.isNotEmpty(topicName) &&
ObjectHelper.isNotEmpty(timestamp)) {
+ Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
+ if (rawTimestamp instanceof Long) {
+ timestamp = (Long) rawTimestamp;
+ } else if (rawTimestamp instanceof Instant) {
+ timestamp = ((Instant) rawTimestamp).toEpochMilli();
+ } else {
+ timestamp = (Long) rawTimestamp;
+ }
+ if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
- final String replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
- final String updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ String replace1;
+ String updatedTopic;
+
+ if (ObjectHelper.isNotEmpty(topicName)) {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ } else {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ }
ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
}
}