This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push:
new 791443d1 fix: Avoid Class cast errors in TimestampRouter
791443d1 is described below
commit 791443d1fbf9a237d0c53e91da735f9790d8d4c0
Author: Christoph Deppisch <[email protected]>
AuthorDate: Thu Jun 13 15:15:06 2024 +0200
fix: Avoid Class cast errors in TimestampRouter
- Avoids String to Long Class cast errors in TimestampRouter utility
- Add test for timestamp-router-action Kamelet
---
.../utils/transform/MessageTimestampRouter.java | 22 ++++----
.../kamelets/utils/transform/TimestampRouter.java | 16 +++---
.../resources/kafka/timestamp-router-pipe.yaml | 60 ++++++++++++++++++++++
.../test/resources/kafka/timestamp-router.feature | 55 ++++++++++++++++++++
.../src/test/resources/kafka/yaks-config.yaml | 1 +
5 files changed, 135 insertions(+), 19 deletions(-)
diff --git
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
index 55c81829..aac0c7f0 100644
---
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
+++
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/MessageTimestampRouter.java
@@ -16,14 +16,6 @@
*/
package org.apache.camel.kamelets.utils.transform;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.util.ObjectHelper;
-
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -36,6 +28,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
public class MessageTimestampRouter {
public void process(@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
@ExchangeProperty("timestampKeys") String timestampKeys,
@ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex)
throws ParseException {
@@ -63,13 +63,13 @@ public class MessageTimestampRouter {
break;
}
}
- long timestamp;
+ Long timestamp = null;
if (ObjectHelper.isNotEmpty(timestampKeyFormat) &&
ObjectHelper.isNotEmpty(rawTimestamp) &&
!timestampKeyFormat.equalsIgnoreCase("timestamp")) {
final SimpleDateFormat timestampKeyFmt = new
SimpleDateFormat(timestampKeyFormat);
timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
- } else {
- timestamp = Long.valueOf((String) rawTimestamp);
+ } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+ timestamp = Long.parseLong(rawTimestamp.toString());
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
diff --git
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
index c0ac9b65..f102bc3d 100644
---
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
+++
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/TimestampRouter.java
@@ -16,11 +16,6 @@
*/
package org.apache.camel.kamelets.utils.transform;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.util.ObjectHelper;
-
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
@@ -28,6 +23,11 @@ import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
public class TimestampRouter {
public void process(@ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
@ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange
ex) {
@@ -38,15 +38,15 @@ public class TimestampRouter {
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
- long timestamp;
+ Long timestamp = null;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long) {
timestamp = (Long) rawTimestamp;
} else if (rawTimestamp instanceof Instant) {
timestamp = ((Instant) rawTimestamp).toEpochMilli();
- } else {
- timestamp = (Long) rawTimestamp;
+ } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+ timestamp = Long.parseLong(rawTimestamp.toString());
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
new file mode 100644
index 00000000..c47dde10
--- /dev/null
+++
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router-pipe.yaml
@@ -0,0 +1,60 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+ name: timestamp-router-pipe
+spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: webhook-source
+ properties:
+ subpath: messages
+ steps:
+ - ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: timestamp-router-action
+ properties:
+ topicFormat: $[topic]_$[timestamp]
+ timestampFormat: YYYY-MM-dd
+ - ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: set-body-action
+ properties:
+ value: $simple{header[message]}
+ - ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-action
+ properties:
+ showHeaders: true
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: kafka-sink
+ properties:
+ bootstrapServers: ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS}
+ user: ${user}
+ password: ${password}
+ topic: dummy
+ securityProtocol: ${securityProtocol}
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature
new file mode 100644
index 00000000..f54c0c45
--- /dev/null
+++
b/tests/camel-kamelets-itest/src/test/resources/kafka/timestamp-router.feature
@@ -0,0 +1,55 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+Feature: Kafka Timestamp Router
+
+ Background:
+ Given variable user is ""
+ Given variable password is ""
+ Given variables
+ | securityProtocol | PLAINTEXT |
+ | topicName | my-topic |
+ | timestamp | yaks:unixTimestamp()000 |
+ | topic |
${topicName}_yaks:currentDate('YYYY-MM-dd') |
+ | message | Camel K rocks! |
+ Given Kafka topic: ${topic}
+ Given Kafka topic partition: 0
+
+ Scenario: Create infrastructure
+ Given start Redpanda container
+
+ Scenario: Create Pipe
+ When load Pipe timestamp-router-pipe.yaml
+ Then Camel K integration timestamp-router-pipe should be running
+ Then Camel K integration timestamp-router-pipe should print Routes startup
+
+ Scenario: Receive message on Kafka topic and verify sink output
+ Given new Kafka connection
+ | url |
${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} |
+ | consumerGroup | consumer-1
|
+ Given URL: yaks:resolveURL('timestamp-router-pipe',8080)
+ Given HTTP request query parameter kafka.TOPIC="${topicName}"
+ Given HTTP request query parameter kafka.TIMESTAMP="${timestamp}"
+ Given HTTP request query parameter message="yaks:urlEncode(${message})"
+ Given HTTP request fork mode is enabled
+ When send GET /messages
+ Then receive Kafka message with body: ${message}
+ And receive HTTP 200 OK
+
+ Scenario: Remove resources
+ Given delete Pipe timestamp-router-pipe
+ And stop Redpanda container
diff --git
a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
index 63ba6c1d..df55ae63 100644
--- a/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
+++ b/tests/camel-kamelets-itest/src/test/resources/kafka/yaks-config.yaml
@@ -39,6 +39,7 @@ config:
resources:
- kafka-source-pipe.yaml
- kafka-sink-pipe.yaml
+ - timestamp-router-pipe.yaml
dump:
enabled: true
failedOnly: true