This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fb551b39d5a CAMEL-21249 - Camel-Kamelets: Move Kamelets utils in Camel
Kamelets component (#15913)
fb551b39d5a is described below
commit fb551b39d5a9e919572c657d3cb9a88a68dade54
Author: Andrea Cosentino <[email protected]>
AuthorDate: Thu Oct 10 18:34:12 2024 +0200
CAMEL-21249 - Camel-Kamelets: Move Kamelets utils in Camel Kamelets
component (#15913)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../apache/camel/catalog/transformers.properties | 1 +
.../aws2-ddb-application-x-struct.json | 14 +
components/camel-kamelet/pom.xml | 48 ++++
.../org/apache/camel/transformer.properties | 7 +
.../transformer/aws2-ddb-application-x-struct | 2 +
.../transformer/aws2-ddb-application-x-struct.json | 14 +
.../component/kamelet/utils/djl/ImageNetUtil.java | 57 ++++
.../component/kamelet/utils/format/MimeType.java | 55 ++++
.../format/schema/DelegatingSchemaResolver.java | 121 +++++++++
.../kamelet/utils/mongodb/SslAwareMongoClient.java | 291 +++++++++++++++++++++
.../gson/JavaTimeInstantTypeAdapter.java | 41 +++
.../kafka/KafkaHeaderDeserializer.java | 98 +++++++
.../kamelet/utils/transform/DropField.java | 60 +++++
.../kamelet/utils/transform/ExtractField.java | 112 ++++++++
.../kamelet/utils/transform/HoistField.java | 38 +++
.../kamelet/utils/transform/InsertField.java | 85 ++++++
.../kamelet/utils/transform/MaskField.java | 125 +++++++++
.../utils/transform/MessageTimestampRouter.java | 90 +++++++
.../kamelet/utils/transform/RegexRouter.java | 50 ++++
.../kamelet/utils/transform/ReplaceField.java | 90 +++++++
.../kamelet/utils/transform/TimestampRouter.java | 69 +++++
.../ddb/Ddb2JsonStructDataTypeTransformer.java | 46 ++++
.../utils/transform/kafka/BatchManualCommit.java | 42 +++
.../utils/transform/kafka/ManualCommit.java | 33 +++
.../kamelet/utils/transform/kafka/ValueToKey.java | 57 ++++
.../utils/kafka/KafkaHeaderDeserializerTest.java | 78 ++++++
.../kamelet/utils/transform/ExtractFieldTest.java | 133 ++++++++++
.../kamelet/utils/transform/HoistFieldTest.java | 58 ++++
.../kamelet/utils/transform/InsertFieldTest.java | 73 ++++++
.../kamelet/utils/transform/MaskFieldTest.java | 86 ++++++
.../kamelet/utils/transform/RegexRouterTest.java | 51 ++++
.../kamelet/utils/transform/ReplaceFieldTest.java | 115 ++++++++
32 files changed, 2240 insertions(+)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
index 4efe6eae08f..b740fa67e2c 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers.properties
@@ -8,6 +8,7 @@ avro-x-java-object
avro-x-struct
aws-cloudtrail-application-cloudevents
aws2-ddb-application-json
+aws2-ddb-application-x-struct
aws2-ddbstream-application-cloudevents
aws2-kinesis-application-cloudevents
aws2-s3-application-cloudevents
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
new file mode 100644
index 00000000000..d826347d326
--- /dev/null
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/transformers/aws2-ddb-application-x-struct.json
@@ -0,0 +1,14 @@
+{
+ "transformer": {
+ "kind": "transformer",
+ "name": "aws2-ddb:application-x-struct",
+ "title": "Aws2 Ddb (Application X Struct)",
+ "description": "Transforms DynamoDB record into a Json node",
+ "deprecated": false,
+ "javaType":
"org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-kamelet",
+ "version": "4.9.0-SNAPSHOT"
+ }
+}
+
diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index e7635a793ba..7473eabff03 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -43,6 +43,54 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-model</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-djl</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.extjwnl</groupId>
+ <artifactId>extjwnl</artifactId>
+ <version>2.0.5</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.sf.extjwnl</groupId>
+ <artifactId>extjwnl-data-wn31</artifactId>
+ <version>1.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-kafka</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jackson</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jackson-avro</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-jackson-protobuf</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-mongodb</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-gson</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- TESTS -->
<dependency>
diff --git
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
new file mode 100644
index 00000000000..3d2567b0cb8
--- /dev/null
+++
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+transformers=aws2-ddb:application-x-struct
+groupId=org.apache.camel
+artifactId=camel-kamelet
+version=4.9.0-SNAPSHOT
+projectName=Camel :: Kamelet
+projectDescription=To call Kamelets
diff --git
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
new file mode 100644
index 00000000000..4db224fcbbe
--- /dev/null
+++
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer
diff --git
a/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
new file mode 100644
index 00000000000..d826347d326
--- /dev/null
+++
b/components/camel-kamelet/src/generated/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
@@ -0,0 +1,14 @@
+{
+ "transformer": {
+ "kind": "transformer",
+ "name": "aws2-ddb:application-x-struct",
+ "title": "Aws2 Ddb (Application X Struct)",
+ "description": "Transforms DynamoDB record into a Json node",
+ "deprecated": false,
+ "javaType":
"org.apache.camel.component.kamelet.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-kamelet",
+ "version": "4.9.0-SNAPSHOT"
+ }
+}
+
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
new file mode 100644
index 00000000000..7f4416deef9
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/djl/ImageNetUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.djl;
+
+import java.util.List;
+
+import ai.djl.modality.Classifications;
+import net.sf.extjwnl.data.IndexWord;
+import net.sf.extjwnl.data.POS;
+import net.sf.extjwnl.data.PointerUtils;
+import net.sf.extjwnl.data.list.PointerTargetNodeList;
+import net.sf.extjwnl.dictionary.Dictionary;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * A utility bean class for handling ImageNet (https://image-net.org/)
classifications.
+ */
+public class ImageNetUtil {
+
+ public ImageNetUtil() {
+ }
+
+ public void extractClassName(Exchange exchange) {
+ Classifications body =
exchange.getMessage().getBody(Classifications.class);
+ String className = body.best().getClassName().split(",")[0].split(" ",
2)[1];
+ exchange.getMessage().setBody(className);
+ }
+
+ public void addHypernym(Exchange exchange) throws Exception {
+ String className = exchange.getMessage().getBody(String.class);
+ Dictionary dic = Dictionary.getDefaultResourceInstance();
+ IndexWord word = dic.getIndexWord(POS.NOUN, className);
+ if (word == null) {
+ throw new RuntimeCamelException("Word not found: " + className);
+ }
+ PointerTargetNodeList hypernyms =
PointerUtils.getDirectHypernyms(word.getSenses().get(0));
+ String hypernym = hypernyms.stream()
+ .map(h -> h.getSynset().getWords().get(0).getLemma())
+ .findFirst().orElse(className);
+ exchange.getMessage().setBody(List.of(className, hypernym));
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
new file mode 100644
index 00000000000..aa16d5300c0
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/MimeType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.format;
+
+import java.util.Objects;
+
+public enum MimeType {
+ JSON("application/json"),
+ PROTOBUF("application/protobuf"),
+ PROTOBUF_BINARY("protobuf/binary"),
+ PROTOBUF_STRUCT("protobuf/x-struct"),
+ AVRO("application/avro"),
+ AVRO_BINARY("avro/binary"),
+ AVRO_STRUCT("avro/x-struct"),
+ BINARY("application/octet-stream"),
+ TEXT("text/plain"),
+ JAVA_OBJECT("application/x-java-object"),
+ STRUCT("application/x-struct");
+
+ private static final MimeType[] VALUES = values();
+ private final String type;
+
+ MimeType(String type) {
+ this.type = type;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ public static MimeType of(String type) {
+ for (MimeType mt : VALUES) {
+ if (Objects.equals(type, mt.type)) {
+ return mt;
+ }
+ }
+
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
new file mode 100644
index 00000000000..aa317448fbb
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/format/schema/DelegatingSchemaResolver.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.format.schema;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.jackson.avro.transform.AvroSchemaResolver;
+import
org.apache.camel.component.jackson.protobuf.transform.ProtobufSchemaResolver;
+import org.apache.camel.component.jackson.transform.JsonSchemaResolver;
+import org.apache.camel.component.kamelet.utils.format.MimeType;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Schema resolver processor delegates to either Avro or Json schema resolver
based on the given mimetype property. When
+ * mimetype is of type application/x-java-object uses additional target
mimetype (usually the produces mimetype) to
+ * determine the schema resolver (Avro or Json). Delegates to schema resolver
and sets proper content class and schema
+ * properties on the delegate.
+ */
+public class DelegatingSchemaResolver implements Processor {
+ private String mimeType;
+ private String targetMimeType;
+
+ private String schema;
+ private String contentClass;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (ObjectHelper.isEmpty(mimeType)) {
+ return;
+ }
+
+ MimeType mimeType = MimeType.of(this.mimeType);
+ Processor resolver;
+ if (mimeType.equals(MimeType.JAVA_OBJECT)) {
+ if (ObjectHelper.isEmpty(targetMimeType)) {
+ return;
+ }
+ resolver = fromMimeType(MimeType.of(targetMimeType));
+ } else {
+ resolver = fromMimeType(mimeType);
+ }
+
+ if (resolver != null) {
+ resolver.process(exchange);
+ }
+ }
+
+ private Processor fromMimeType(MimeType mimeType) {
+ switch (mimeType) {
+ case PROTOBUF:
+ case PROTOBUF_BINARY:
+ case PROTOBUF_STRUCT:
+ ProtobufSchemaResolver protobufSchemaResolver = new
ProtobufSchemaResolver();
+ protobufSchemaResolver.setSchema(this.schema);
+ protobufSchemaResolver.setContentClass(this.contentClass);
+ return protobufSchemaResolver;
+ case AVRO:
+ case AVRO_BINARY:
+ case AVRO_STRUCT:
+ AvroSchemaResolver avroSchemaResolver = new
AvroSchemaResolver();
+ avroSchemaResolver.setSchema(this.schema);
+ avroSchemaResolver.setContentClass(this.contentClass);
+ return avroSchemaResolver;
+ case JSON:
+ case STRUCT:
+ JsonSchemaResolver jsonSchemaResolver = new
JsonSchemaResolver();
+ jsonSchemaResolver.setSchema(this.schema);
+ jsonSchemaResolver.setContentClass(this.contentClass);
+ return jsonSchemaResolver;
+ default:
+ return null;
+ }
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public void setMimeType(String mimeType) {
+ this.mimeType = mimeType;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ this.contentClass = contentClass;
+ }
+
+ public String getTargetMimeType() {
+ return targetMimeType;
+ }
+
+ public void setTargetMimeType(String targetMimeType) {
+ this.targetMimeType = targetMimeType;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
new file mode 100644
index 00000000000..d7b3819fb1d
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/mongodb/SslAwareMongoClient.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.mongodb;
+
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import com.mongodb.*;
+import com.mongodb.client.*;
+import com.mongodb.connection.ClusterDescription;
+import org.apache.camel.util.function.Suppliers;
+import org.bson.Document;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SslAwareMongoClient implements MongoClient {
+ private static final Logger LOG =
LoggerFactory.getLogger(SslAwareMongoClient.class);
+ private static final TrustManager[] trustAllCerts = new TrustManager[] {
+ new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] arg0, String
arg1)
+ throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] arg0, String
arg1)
+ throws CertificateException {
+ }
+ }
+ };
+ private final Supplier<MongoClient> wrappedMongoClient =
Suppliers.memorize(new Supplier<MongoClient>() {
+ @Override
+ public MongoClient get() {
+ String credentials = username == null ? "" : username;
+
+ if (!credentials.isEmpty()) {
+ credentials += password == null ? "@" : ":" + password + "@";
+ }
+
+ MongoClientSettings settings = MongoClientSettings.builder()
+ .applyToSslSettings(builder -> {
+ builder.enabled(ssl);
+ if (!sslValidationEnabled) {
+ builder.invalidHostNameAllowed(true);
+ SSLContext sc = null;
+ try {
+ sc = SSLContext.getInstance("TLSv1.2");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("Error
instantiating trust all SSL context.", e);
+ }
+ try {
+ sc.init(null, trustAllCerts, new
java.security.SecureRandom());
+ } catch (KeyManagementException e) {
+ throw new RuntimeException("Error
instantiating trust all SSL context.", e);
+ }
+ builder.context(sc);
+ }
+ })
+ .applyConnectionString(new
ConnectionString(String.format("mongodb://%s%s", credentials, hosts)))
+ .build();
+ LOG.info("Connection created using provided credentials");
+ return MongoClients.create(settings);
+ }
+ });
+ private String hosts = null;
+ private String username = null;
+ private String password = null;
+ private boolean ssl = true;
+
+ private boolean sslValidationEnabled = true;
+
+ public MongoClient getWrappedMongoClient() {
+ return wrappedMongoClient.get();
+ }
+
+ @Override
+ public MongoDatabase getDatabase(String s) {
+ return getWrappedMongoClient().getDatabase(s);
+ }
+
+ @Override
+ public ClientSession startSession() {
+ return getWrappedMongoClient().startSession();
+ }
+
+ @Override
+ public ClientSession startSession(ClientSessionOptions
clientSessionOptions) {
+ return getWrappedMongoClient().startSession(clientSessionOptions);
+ }
+
+ @Override
+ public void close() {
+ getWrappedMongoClient().close();
+ }
+
+ @Override
+ public MongoIterable<String> listDatabaseNames() {
+ return getWrappedMongoClient().listDatabaseNames();
+ }
+
+ @Override
+ public MongoIterable<String> listDatabaseNames(ClientSession
clientSession) {
+ return getWrappedMongoClient().listDatabaseNames(clientSession);
+ }
+
+ @Override
+ public ListDatabasesIterable<Document> listDatabases() {
+ return getWrappedMongoClient().listDatabases();
+ }
+
+ @Override
+ public ListDatabasesIterable<Document> listDatabases(ClientSession
clientSession) {
+ return getWrappedMongoClient().listDatabases(clientSession);
+ }
+
+ @Override
+ public <TResult> ListDatabasesIterable<TResult>
listDatabases(Class<TResult> aClass) {
+ return getWrappedMongoClient().listDatabases(aClass);
+ }
+
+ @Override
+ public <TResult> ListDatabasesIterable<TResult>
listDatabases(ClientSession clientSession, Class<TResult> aClass) {
+ return getWrappedMongoClient().listDatabases(clientSession, aClass);
+ }
+
+ @Override
+ public ChangeStreamIterable<Document> watch() {
+ return getWrappedMongoClient().watch();
+ }
+
+ @Override
+ public <TResult> ChangeStreamIterable<TResult> watch(Class<TResult>
aClass) {
+ return getWrappedMongoClient().watch(aClass);
+ }
+
+ @Override
+ public ChangeStreamIterable<Document> watch(List<? extends Bson> list) {
+ return getWrappedMongoClient().watch(list);
+ }
+
+ @Override
+ public <TResult> ChangeStreamIterable<TResult> watch(List<? extends Bson>
list, Class<TResult> aClass) {
+ return getWrappedMongoClient().watch(list, aClass);
+ }
+
+ @Override
+ public ChangeStreamIterable<Document> watch(ClientSession clientSession) {
+ return getWrappedMongoClient().watch(clientSession);
+ }
+
+ @Override
+ public <TResult> ChangeStreamIterable<TResult> watch(ClientSession
clientSession, Class<TResult> aClass) {
+ return getWrappedMongoClient().watch(clientSession, aClass);
+ }
+
+ @Override
+ public ChangeStreamIterable<Document> watch(ClientSession clientSession,
List<? extends Bson> list) {
+ return getWrappedMongoClient().watch(clientSession, list);
+ }
+
+ @Override
+ public <TResult> ChangeStreamIterable<TResult> watch(
+ ClientSession clientSession, List<? extends Bson> list,
+ Class<TResult> aClass) {
+ return getWrappedMongoClient().watch(clientSession, list, aClass);
+ }
+
+ @Override
+ public ClusterDescription getClusterDescription() {
+ return getWrappedMongoClient().getClusterDescription();
+ }
+
+ @Override
+ public CodecRegistry getCodecRegistry() {
+ return getWrappedMongoClient().getCodecRegistry();
+ }
+
+ @Override
+ public ReadPreference getReadPreference() {
+ return getWrappedMongoClient().getReadPreference();
+ }
+
+ @Override
+ public WriteConcern getWriteConcern() {
+ return getWrappedMongoClient().getWriteConcern();
+ }
+
+ @Override
+ public ReadConcern getReadConcern() {
+ return getWrappedMongoClient().getReadConcern();
+ }
+
+ @Override
+ public Long getTimeout(TimeUnit timeUnit) {
+ return getWrappedMongoClient().getTimeout(timeUnit);
+ }
+
+ @Override
+ public MongoCluster withCodecRegistry(CodecRegistry codecRegistry) {
+ return getWrappedMongoClient().withCodecRegistry(codecRegistry);
+ }
+
+ @Override
+ public MongoCluster withReadPreference(ReadPreference readPreference) {
+ return getWrappedMongoClient().withReadPreference(readPreference);
+ }
+
+ @Override
+ public MongoCluster withWriteConcern(WriteConcern writeConcern) {
+ return getWrappedMongoClient().withWriteConcern(writeConcern);
+ }
+
+ @Override
+ public MongoCluster withReadConcern(ReadConcern readConcern) {
+ return getWrappedMongoClient().withReadConcern(readConcern);
+ }
+
+ @Override
+ public MongoCluster withTimeout(long l, TimeUnit timeUnit) {
+ return getWrappedMongoClient().withTimeout(l, timeUnit);
+ }
+
+ public String getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(String hosts) {
+ this.hosts = hosts;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public boolean isSsl() {
+ return ssl;
+ }
+
+ public void setSsl(boolean ssl) {
+ this.ssl = ssl;
+ }
+
+ public boolean isSslValidationEnabled() {
+ return sslValidationEnabled;
+ }
+
+ public void setSslValidationEnabled(boolean sslValidationEnabled) {
+ this.sslValidationEnabled = sslValidationEnabled;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
new file mode 100644
index 00000000000..6e062fc4c27
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.serialization.gson;
+
+import java.lang.reflect.Type;
+import java.time.Instant;
+
+import com.google.gson.*;
+
+public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>,
JsonDeserializer<Instant> {
+
+ @Override
+ public JsonElement serialize(
+ final Instant time, final Type typeOfSrc,
+ final JsonSerializationContext context) {
+ return new JsonPrimitive(time.getEpochSecond() * 1000);
+ }
+
+ @Override
+ public Instant deserialize(
+ final JsonElement json, final Type typeOfT,
+ final JsonDeserializationContext context)
+ throws JsonParseException {
+ return Instant.ofEpochMilli(json.getAsLong());
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
new file mode 100644
index 00000000000..6cb8f2f1b02
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/serialization/kafka/KafkaHeaderDeserializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.serialization.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.support.SimpleTypeConverter;
+
+/**
+ * Header deserializer used in Kafka source Kamelet. Automatically converts
all message headers to String. Uses given
+ * type converter implementation set on the Camel context to convert values.
If no type converter is set the
+ * deserializer uses its own fallback conversion implementation.
+ */
+public class KafkaHeaderDeserializer implements Processor {
+
+ public boolean enabled = false;
+
+ private final SimpleTypeConverter defaultTypeConverter = new
SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (!enabled) {
+ return;
+ }
+
+ Map<String, Object> headers = exchange.getMessage().getHeaders();
+
+ TypeConverter typeConverter = exchange.getContext().getTypeConverter();
+ if (typeConverter == null) {
+ typeConverter = defaultTypeConverter;
+ }
+
+ for (Map.Entry<String, Object> header : headers.entrySet()) {
+ if (shouldDeserialize(header)) {
+ header.setValue(typeConverter.convertTo(String.class,
header.getValue()));
+ }
+ }
+ }
+
+ /**
+ * Fallback conversion strategy supporting null values, String and byte[].
Converts headers to respective String
+ * representation or null.
+ *
+ * @param type target type, always String in this case.
+ * @param exchange the exchange containing all headers to convert.
+ * @param value the current value to convert.
+ * @return String representation of given value or null if value
itself is null.
+ */
+ private static Object convert(Class<?> type, Exchange exchange, Object
value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ if (value instanceof byte[]) {
+ return new String((byte[]) value, StandardCharsets.UTF_8);
+ }
+
+ return value.toString();
+ }
+
+ /**
+ * Exclude special Kafka headers from auto deserialization.
+ *
+ * @param entry
+ * @return
+ */
+ private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
+ return !entry.getKey().equals(KafkaConstants.HEADERS) &&
!entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
+ }
+
+ public void setEnabled(String enabled) {
+ this.enabled = Boolean.parseBoolean(enabled);
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
new file mode 100644
index 00000000000..7dec28eb05b
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/DropField.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+
+public class DropField implements Processor {
+
+ String field;
+
+ /**
+ * Default constructor.
+ */
+ public DropField() {
+ }
+
+ /**
+ * Constructor using fields.
+ *
+ * @param field the field name to drop.
+ */
+ public DropField(String field, String value) {
+ this.field = field;
+ }
+
+ public void process(Exchange ex) throws InvalidPayloadException {
+ JsonNode body = ex.getMessage().getBody(JsonNode.class);
+ if (body == null) {
+ throw new InvalidPayloadException(ex, JsonNode.class);
+ }
+
+ if (body.getNodeType().equals(JsonNodeType.OBJECT)) {
+ ((ObjectNode) body).remove(field);
+ ex.getMessage().setBody(body);
+ }
+ }
+
+ public void setField(String field) {
+ this.field = field;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
new file mode 100644
index 00000000000..eab12202931
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ExtractField.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+
+public class ExtractField implements Processor {
+
+ String field;
+ String headerOutputName;
+ boolean headerOutput;
+ boolean strictHeaderCheck;
+ boolean trimField;
+
+ static final String EXTRACTED_FIELD_HEADER =
"CamelKameletsExtractFieldName";
+
+ /**
+ * Default constructor
+ */
+ public ExtractField() {
+ }
+
+ /**
+ * Constructor using field member.
+ *
+ * @param field the field name to extract.
+ */
+ public ExtractField(String field) {
+ this.field = field;
+ }
+
+ @Override
+ public void process(Exchange ex) throws InvalidPayloadException {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+
+ if (jsonNodeBody == null) {
+ throw new InvalidPayloadException(ex, JsonNode.class);
+
+ }
+
+ Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new
TypeReference<Map<Object, Object>>() {
+ });
+ if (!headerOutput || (strictHeaderCheck && checkHeaderExistence(ex))) {
+ ex.getMessage().setBody(body.get(field));
+ } else {
+ extractToHeader(ex, body);
+ }
+ if (trimField) {
+ ex.setProperty("trimField", "true");
+ } else {
+ ex.setProperty("trimField", "false");
+ }
+ }
+
+ private void extractToHeader(Exchange ex, Map<Object, Object> body) {
+ if (headerOutputName == null || headerOutputName.isEmpty() ||
"none".equalsIgnoreCase(headerOutputName)) {
+ ex.getMessage().setHeader(EXTRACTED_FIELD_HEADER, body.get(field));
+ } else {
+ ex.getMessage().setHeader(headerOutputName, body.get(field));
+ }
+ }
+
+ private boolean checkHeaderExistence(Exchange exchange) {
+ if (headerOutputName == null || headerOutputName.isEmpty() ||
"none".equalsIgnoreCase(headerOutputName)) {
+ return
exchange.getMessage().getHeaders().containsKey(EXTRACTED_FIELD_HEADER);
+ } else {
+ return
exchange.getMessage().getHeaders().containsKey(headerOutputName);
+ }
+ }
+
+ public void setField(String field) {
+ this.field = field;
+ }
+
+ public void setHeaderOutput(boolean headerOutput) {
+ this.headerOutput = headerOutput;
+ }
+
+ public void setHeaderOutputName(String headerOutputName) {
+ this.headerOutputName = headerOutputName;
+ }
+
+ public void setStrictHeaderCheck(boolean strictHeaderCheck) {
+ this.strictHeaderCheck = strictHeaderCheck;
+ }
+
+ public void setTrimField(boolean trimField) {
+ this.trimField = trimField;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
new file mode 100644
index 00000000000..43fee2e79f0
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/HoistField.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+
+public class HoistField {
+
+ public JsonNode process(@ExchangeProperty("field") String field, Exchange
ex) throws InvalidPayloadException {
+ ObjectMapper mapper = new ObjectMapper();
+ Object body = ex.getMessage().getBody();
+ Map<Object, Object> updatedBody = new HashMap<>();
+ updatedBody.put(field, body);
+ return mapper.valueToTree(updatedBody);
+ }
+
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
new file mode 100644
index 00000000000..0ccf0a2b6fe
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/InsertField.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Processor;
+import org.apache.camel.support.LanguageSupport;
+
+public class InsertField implements Processor {
+
+ String field;
+ String value;
+
+ /**
+ * Default constructor.
+ */
+ public InsertField() {
+ }
+
+ /**
+ * Constructor using fields.
+ *
+ * @param field the field name to insert.
+ * @param value the value of the new field.
+ */
+ public InsertField(String field, String value) {
+ this.field = field;
+ this.value = value;
+ }
+
+ public void process(Exchange ex) throws InvalidPayloadException {
+ JsonNode body = ex.getMessage().getBody(JsonNode.class);
+
+ if (body == null) {
+ throw new InvalidPayloadException(ex, JsonNode.class);
+ }
+
+ String resolvedValue;
+ if (LanguageSupport.hasSimpleFunction(value)) {
+ resolvedValue =
ex.getContext().resolveLanguage("simple").createExpression(value).evaluate(ex,
String.class);
+ } else {
+ resolvedValue = value;
+ }
+
+ switch (body.getNodeType()) {
+ case ARRAY:
+ ((ArrayNode) body).add(resolvedValue);
+ break;
+ case OBJECT:
+ ((ObjectNode) body).put(field, resolvedValue);
+ break;
+ default:
+ ((ObjectNode) body).put(field, resolvedValue);
+ break;
+ }
+
+ ex.getMessage().setBody(body);
+ }
+
+ public void setField(String field) {
+ this.field = field;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
new file mode 100644
index 00000000000..ed83fbea232
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MaskField.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.util.ObjectHelper;
+
+public class MaskField {
+
+ private static final Map<Class<?>, Function<String, ?>> MAPPING_FUNC = new
HashMap<>();
+ private static final Map<Class<?>, Object> BASIC_MAPPING = new HashMap<>();
+
+ static {
+ BASIC_MAPPING.put(Boolean.class, Boolean.FALSE);
+ BASIC_MAPPING.put(Byte.class, (byte) 0);
+ BASIC_MAPPING.put(Short.class, (short) 0);
+ BASIC_MAPPING.put(Integer.class, 0);
+ BASIC_MAPPING.put(Long.class, 0L);
+ BASIC_MAPPING.put(Float.class, 0f);
+ BASIC_MAPPING.put(Double.class, 0d);
+ BASIC_MAPPING.put(BigInteger.class, BigInteger.ZERO);
+ BASIC_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
+ BASIC_MAPPING.put(Date.class, new Date(0));
+ BASIC_MAPPING.put(String.class, "");
+
+ MAPPING_FUNC.put(Byte.class, Byte::parseByte);
+ MAPPING_FUNC.put(Short.class, Short::parseShort);
+ MAPPING_FUNC.put(Integer.class, Integer::parseInt);
+ MAPPING_FUNC.put(Long.class, Long::parseLong);
+ MAPPING_FUNC.put(Float.class, Float::parseFloat);
+ MAPPING_FUNC.put(Double.class, Double::parseDouble);
+ MAPPING_FUNC.put(String.class, Function.identity());
+ MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
+ MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
+ }
+
+ public JsonNode process(
+ @ExchangeProperty("fields") String fields,
@ExchangeProperty("replacement") String replacement, Exchange ex)
+ throws InvalidPayloadException {
+ ObjectMapper mapper = new ObjectMapper();
+ List<String> splittedFields = new ArrayList<>();
+ JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+ Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new
TypeReference<Map<Object, Object>>() {
+ });
+ if (ObjectHelper.isNotEmpty(fields)) {
+ splittedFields =
Arrays.stream(fields.split(",")).collect(Collectors.toList());
+ }
+
+ Map<Object, Object> updatedBody = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : body.entrySet()) {
+ final String fieldName = (String) entry.getKey();
+ final Object origFieldValue = entry.getValue();
+ updatedBody.put(fieldName,
+ filterNames(fieldName, splittedFields) ?
masked(origFieldValue, replacement) : origFieldValue);
+ }
+ if (!updatedBody.isEmpty()) {
+ return mapper.valueToTree(updatedBody);
+ } else {
+ return mapper.valueToTree(body);
+ }
+ }
+
+ boolean filterNames(String fieldName, List<String> splittedFields) {
+ return splittedFields.contains(fieldName);
+ }
+
+ private Object masked(Object value, String replacement) {
+ if (value == null) {
+ return null;
+ }
+ return replacement == null ? maskWithNullValue(value) :
maskWithCustomReplacement(value, replacement);
+ }
+
+ private static Object maskWithCustomReplacement(Object value, String
replacement) {
+ Function<String, ?> replacementMapper =
MAPPING_FUNC.get(value.getClass());
+ if (replacementMapper == null) {
+ throw new IllegalArgumentException(
+ "Unable to mask value of type " + value.getClass() + "
with custom replacement.");
+ }
+ try {
+ return replacementMapper.apply(replacement);
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException(
+ "Unable to convert " + replacement + " (" +
replacement.getClass() + ") to number", ex);
+ }
+ }
+
+ private static Object maskWithNullValue(Object value) {
+ Object maskedValue = BASIC_MAPPING.get(value.getClass());
+ if (maskedValue == null) {
+ if (value instanceof List)
+ maskedValue = Collections.emptyList();
+ else if (value instanceof Map)
+ maskedValue = Collections.emptyMap();
+ else
+ throw new IllegalArgumentException("Unable to mask value of
type: " + value.getClass());
+ }
+ return maskedValue;
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
new file mode 100644
index 00000000000..3b1ef2eb3dc
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/MessageTimestampRouter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class MessageTimestampRouter {
+
+ public void process(
+ @ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
+ @ExchangeProperty("timestampKeys") String timestampKeys,
+ @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat,
Exchange ex)
+ throws ParseException {
+ final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
+
+ final Pattern TIMESTAMP = Pattern.compile("$[timestamp]",
Pattern.LITERAL);
+
+ final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
+ fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ ObjectMapper mapper = new ObjectMapper();
+ List<String> splittedKeys = new ArrayList<>();
+ JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+ Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new
TypeReference<Map<Object, Object>>() {
+ });
+ if (ObjectHelper.isNotEmpty(timestampKeys)) {
+ splittedKeys =
Arrays.stream(timestampKeys.split(",")).collect(Collectors.toList());
+ }
+
+ Object rawTimestamp = null;
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ for (String key : splittedKeys) {
+ if (ObjectHelper.isNotEmpty(key)) {
+ rawTimestamp = body.get(key);
+ break;
+ }
+ }
+ Long timestamp = null;
+ if (ObjectHelper.isNotEmpty(timestampKeyFormat) &&
ObjectHelper.isNotEmpty(rawTimestamp)
+ && !timestampKeyFormat.equalsIgnoreCase("timestamp")) {
+ final SimpleDateFormat timestampKeyFmt = new
SimpleDateFormat(timestampKeyFormat);
+ timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+ timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
+ } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+ timestamp = Long.parseLong(rawTimestamp.toString());
+ }
+ if (ObjectHelper.isNotEmpty(timestamp)) {
+ final String formattedTimestamp = fmt.format(new Date(timestamp));
+ String replace1;
+ String updatedTopic;
+
+ if (ObjectHelper.isNotEmpty(topicName)) {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ } else {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ }
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
+ }
+ }
+
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
new file mode 100644
index 00000000000..517f1d0b787
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/RegexRouter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class RegexRouter {
+
+ public void process(
+ @ExchangeProperty("regex") String regex,
@ExchangeProperty("replacement") String replacement, Exchange ex) {
+ Pattern regexPattern = Pattern.compile(regex);
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ if (ObjectHelper.isNotEmpty(topicName)) {
+ final Matcher matcher = regexPattern.matcher(topicName);
+ if (matcher.matches()) {
+ final String topicUpdated = matcher.replaceFirst(replacement);
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
topicUpdated);
+ }
+ }
+ String ceType = ex.getMessage().getHeader("ce-type", String.class);
+ if (ObjectHelper.isNotEmpty(ceType)) {
+ final Matcher matcher = regexPattern.matcher(ceType);
+ if (matcher.matches()) {
+ final String ceTypeUpdated = matcher.replaceFirst(replacement);
+ ex.getMessage().setHeader("ce-type", ceTypeUpdated);
+ }
+ }
+ }
+
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
new file mode 100644
index 00000000000..79048041f63
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/ReplaceField.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.util.ObjectHelper;
+
+public class ReplaceField {
+
+ public JsonNode process(
+ @ExchangeProperty("enabled") String enabled,
@ExchangeProperty("disabled") String disabled,
+ @ExchangeProperty("renames") String renames, Exchange ex)
+ throws InvalidPayloadException {
+ ObjectMapper mapper = new ObjectMapper();
+ List<String> enabledFields = new ArrayList<>();
+ List<String> disabledFields = new ArrayList<>();
+ List<String> renameFields = new ArrayList<>();
+ JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+ Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new
TypeReference<Map<Object, Object>>() {
+ });
+ if (ObjectHelper.isNotEmpty(enabled) &&
!enabled.equalsIgnoreCase("all")) {
+ enabledFields =
Arrays.stream(enabled.split(",")).collect(Collectors.toList());
+ }
+ if (ObjectHelper.isNotEmpty(disabled) &&
!disabled.equalsIgnoreCase("none")) {
+ disabledFields =
Arrays.stream(disabled.split(",")).collect(Collectors.toList());
+ }
+ if (ObjectHelper.isNotEmpty(disabled)) {
+ renameFields =
Arrays.stream(renames.split(",")).collect(Collectors.toList());
+ }
+ Map<Object, Object> updatedBody = new HashMap<>();
+
+ if (ObjectHelper.isNotEmpty(renameFields)) {
+ Map<String, String> renamingMap = parseNames(renameFields);
+
+ for (Map.Entry<Object, Object> entry : body.entrySet()) {
+ final String fieldName = (String) entry.getKey();
+ if (filterNames(fieldName, enabledFields, disabledFields)) {
+ final Object fieldValue = entry.getValue();
+ updatedBody.put(renameOptional(fieldName, renamingMap),
fieldValue);
+ }
+ }
+ }
+ if (!updatedBody.isEmpty()) {
+ return mapper.valueToTree(updatedBody);
+ } else {
+ return mapper.valueToTree(body);
+ }
+ }
+
+ boolean filterNames(String fieldName, List<String> enabledFields,
List<String> disabledFields) {
+ return !disabledFields.contains(fieldName) && (enabledFields.isEmpty()
|| enabledFields.contains(fieldName));
+ }
+
+ static Map<String, String> parseNames(List<String> mappings) {
+ final Map<String, String> m = new HashMap<>();
+ for (String mapping : mappings) {
+ final String[] parts = mapping.split(":");
+ m.put(parts[0], parts[1]);
+ }
+ return m;
+ }
+
+ String renameOptional(String fieldName, Map<String, String> renames) {
+ final String mapping = renames.get(fieldName);
+ return mapping == null ? fieldName : mapping;
+ }
+
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
new file mode 100644
index 00000000000..9c660497f0e
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/TimestampRouter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class TimestampRouter {
+
+ public void process(
+ @ExchangeProperty("topicFormat") String topicFormat,
@ExchangeProperty("timestampFormat") String timestampFormat,
+ @ExchangeProperty("timestampHeaderName") String
timestampHeaderName, Exchange ex) {
+ final Pattern TOPIC = Pattern.compile("$[topic]", Pattern.LITERAL);
+
+ final Pattern TIMESTAMP = Pattern.compile("$[timestamp]",
Pattern.LITERAL);
+
+ final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
+ fmt.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ Long timestamp = null;
+ String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC,
String.class);
+ Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
+ if (rawTimestamp instanceof Long) {
+ timestamp = (Long) rawTimestamp;
+ } else if (rawTimestamp instanceof Instant) {
+ timestamp = ((Instant) rawTimestamp).toEpochMilli();
+ } else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
+ timestamp = Long.parseLong(rawTimestamp.toString());
+ }
+ if (ObjectHelper.isNotEmpty(timestamp)) {
+ final String formattedTimestamp = fmt.format(new Date(timestamp));
+ String replace1;
+ String updatedTopic;
+
+ if (ObjectHelper.isNotEmpty(topicName)) {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(topicName));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ } else {
+ replace1 =
TOPIC.matcher(topicFormat).replaceAll(Matcher.quoteReplacement(""));
+ updatedTopic =
TIMESTAMP.matcher(replace1).replaceAll(Matcher.quoteReplacement(formattedTimestamp));
+ }
+ ex.getMessage().setHeader(KafkaConstants.OVERRIDE_TOPIC,
updatedTopic);
+ }
+ }
+
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
new file mode 100644
index 00000000000..629f4ebadb9
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.transform.aws2.ddb;
+
+import java.time.Instant;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.camel.Message;
+import
org.apache.camel.component.kamelet.utils.serialization.gson.JavaTimeInstantTypeAdapter;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeTransformer;
+import org.apache.camel.spi.Transformer;
+
+@DataTypeTransformer(name = "aws2-ddb:application-x-struct",
+ description = "Transforms DynamoDB record into a Json
node")
+public class Ddb2JsonStructDataTypeTransformer extends Transformer {
+
+ private final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Instant.class, new
JavaTimeInstantTypeAdapter())
+ .create();
+
+ @Override
+ public void transform(Message message, DataType fromType, DataType toType)
{
+ if (message.getBody() instanceof String) {
+ return;
+ }
+
+ message.setBody(gson.toJson(message.getBody()));
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
new file mode 100644
index 00000000000..8b97d3b9ff1
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/BatchManualCommit.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+
+public class BatchManualCommit implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ List<?> exchanges = exchange.getMessage().getBody(List.class);
+ if (exchanges.size() > 0) {
+ final Object tmp = exchanges.get(exchanges.size() - 1);
+ if (tmp instanceof Exchange element) {
+ KafkaManualCommit manual
+ =
element.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
+ if (manual != null) {
+ manual.commit();
+ }
+ }
+ }
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
new file mode 100644
index 00000000000..db92916f2b9
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ManualCommit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+
+public class ManualCommit implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ KafkaManualCommit manual =
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
KafkaManualCommit.class);
+ if (manual != null) {
+ manual.commit();
+ }
+ }
+}
diff --git
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
new file mode 100644
index 00000000000..8333ab25cf6
--- /dev/null
+++
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/utils/transform/kafka/ValueToKey.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform.kafka;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeProperty;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.util.ObjectHelper;
+
+public class ValueToKey {
+
+ public void process(@ExchangeProperty("fields") String fields, Exchange
ex) throws InvalidPayloadException {
+ List<String> splittedFields = new ArrayList<>();
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonNodeBody = ex.getMessage().getBody(JsonNode.class);
+ Map<Object, Object> body = mapper.convertValue(jsonNodeBody, new
TypeReference<Map<Object, Object>>() {
+ });
+ if (ObjectHelper.isNotEmpty(fields)) {
+ splittedFields =
Arrays.stream(fields.split(",")).collect(Collectors.toList());
+ }
+ Map<Object, Object> key = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : body.entrySet()) {
+ final String fieldName = (String) entry.getKey();
+ if (filterNames(fieldName, splittedFields)) {
+ final Object fieldValue = entry.getValue();
+ key.put(entry.getKey(), fieldValue);
+ }
+ }
+
+ ex.getMessage().setHeader(KafkaConstants.KEY, key);
+ }
+
+ boolean filterNames(String fieldName, List<String> splittedFields) {
+ return splittedFields.contains(fieldName);
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
new file mode 100644
index 00000000000..9332887679b
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/kafka/KafkaHeaderDeserializerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kamelet.utils.kafka;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.camel.Exchange;
+import
org.apache.camel.component.kamelet.utils.serialization.kafka.KafkaHeaderDeserializer;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class KafkaHeaderDeserializerTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final KafkaHeaderDeserializer processor = new
KafkaHeaderDeserializer();
+
+ @BeforeEach
+ void setup() {
+ this.camelContext = new DefaultCamelContext();
+ }
+
+ @Test
+ void shouldDeserializeHeaders() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setHeader("foo", "bar");
+ exchange.getMessage().setHeader("fooBytes",
"barBytes".getBytes(StandardCharsets.UTF_8));
+ exchange.getMessage().setHeader("fooNull", null);
+ exchange.getMessage().setHeader("number", 1L);
+
+ processor.enabled = true;
+ processor.process(exchange);
+
+ Assertions.assertTrue(exchange.getMessage().hasHeaders());
+ Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+ Assertions.assertEquals("barBytes",
exchange.getMessage().getHeader("fooBytes"));
+
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull"));
+ Assertions.assertNull(exchange.getMessage().getHeader("fooNull"));
+ Assertions.assertEquals("1",
exchange.getMessage().getHeader("number"));
+ }
+
+ @Test
+ void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setHeader("foo", "bar");
+ exchange.getMessage().setHeader("fooBytes",
"barBytes".getBytes(StandardCharsets.UTF_8));
+
+ processor.enabled = false;
+ processor.process(exchange);
+
+ Assertions.assertTrue(exchange.getMessage().hasHeaders());
+ Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
+ Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes")
instanceof byte[]);
+
Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)),
+ Arrays.toString((byte[])
exchange.getMessage().getHeader("fooBytes")));
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
new file mode 100644
index 00000000000..94608d554f1
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ExtractFieldTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ExtractFieldTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private ExtractField processor;
+
+ private final String baseJson = "{" + "\n" +
+ " \"name\" : \"Rajesh Koothrappali\"" +
"\n" +
+ "}";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new ExtractField();
+ }
+
+ @Test
+ void shouldExtractFieldFromJsonNode() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setField("name");
+ processor.process(exchange);
+
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getBody(String.class));
+ }
+
+ @Test
+ void shouldExtractFieldToHeader() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setField("name");
+ processor.setHeaderOutput(true);
+ processor.setHeaderOutputName("name");
+ processor.process(exchange);
+
+ Assertions.assertEquals(baseJson,
exchange.getMessage().getBody(String.class));
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getHeader("name"));
+ }
+
+ @Test
+ void shouldExtractFieldToHeaderWithStrictHeaderCheck() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setField("name");
+ processor.setHeaderOutput(true);
+ processor.setHeaderOutputName("name");
+ processor.setStrictHeaderCheck(true);
+ processor.process(exchange);
+
+ Assertions.assertEquals(baseJson,
exchange.getMessage().getBody(String.class));
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getHeader("name"));
+
+ exchange.getMessage().setHeader("name", "somethingElse");
+
+ processor.process(exchange);
+
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getBody(String.class));
+ Assertions.assertEquals("somethingElse",
exchange.getMessage().getHeader("name"));
+ }
+
+ @Test
+ void shouldExtractFieldToDefaultHeader() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setField("name");
+ processor.setHeaderOutput(true);
+ processor.process(exchange);
+
+ Assertions.assertEquals(baseJson,
exchange.getMessage().getBody(String.class));
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER));
+
+ exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setHeaderOutputName("none");
+ processor.process(exchange);
+
+ Assertions.assertEquals(baseJson,
exchange.getMessage().getBody(String.class));
+ Assertions.assertEquals("Rajesh Koothrappali",
exchange.getMessage().getHeader(ExtractField.EXTRACTED_FIELD_HEADER));
+ }
+
+ @Test
+ void shouldExtractFieldWithT() throws Exception {
+ final String baseJson =
"{\"id\":\"1\",\"message\":\"Camel\\\\tRocks\"}";
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor.setField("message");
+ processor.setTrimField(true);
+ processor.process(exchange);
+
+ Assertions.assertEquals("Camel\\tRocks",
exchange.getMessage().getBody());
+ }
+
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
new file mode 100644
index 00000000000..226328d3367
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/HoistFieldTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class HoistFieldTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private HoistField processor;
+
+ private final String baseJson = "{" + "\n" +
+ " \"name\" : \"Rajesh Koothrappali\"" +
"\n" +
+ "}";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new HoistField();
+ }
+
+ @Test
+ void shouldHoistField() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode s = processor.process("element", exchange);
+ Assertions.assertEquals("{" + "\"element\"" + ":" + "{" +
+ "\"name\":\"Rajesh Koothrappali\"" +
+ "}" + "}",
+ s.toString());
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
new file mode 100644
index 00000000000..c1da17929cc
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/InsertFieldTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class InsertFieldTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private InsertField processor;
+
+ private final String baseJson = "{" +
+ "\"name\":\"Rajesh Koothrappali\"" +
+ "}";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new InsertField();
+ }
+
+ @Test
+ void shouldAddFieldToPlainJson() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ processor = new InsertField("age", "29");
+ processor.process(exchange);
+
+ Assertions.assertEquals(exchange.getMessage().getBody(String.class),
"{" + "\n" +
+ "
\"name\" : \"Rajesh Koothrappali\"," + "\n" +
+ "
\"age\" : \"29\"" + "\n" +
+
"}");
+ }
+
+ @Test
+ void shouldAddFieldToArrayJson() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ String arrayJson = "[\"batman\",\"spiderman\",\"wonderwoman\"]";
+ exchange.getMessage().setBody(mapper.readTree(arrayJson));
+
+ processor.setValue("green lantern");
+ processor.process(exchange);
+
+ Assertions.assertEquals(exchange.getMessage().getBody(String.class),
+ "[ \"batman\", \"spiderman\", \"wonderwoman\", \"green
lantern\" ]");
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
new file mode 100644
index 00000000000..18972e635ea
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/MaskFieldTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class MaskFieldTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private MaskField processor;
+
+ private final String baseJson = "{" + "\n" +
+ " \"name\" : \"Rajesh Koothrappali\"" +
"\n" +
+ "}";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new MaskField();
+ }
+
+ @Test
+ void shouldMaskField() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode s = processor.process("name", "xxxx", exchange);
+ Assertions.assertEquals("\"xxxx\"", s.get("name").toString());
+ }
+
+ @Test
+ void shouldMaskFieldWithNull() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode s = processor.process("name", null, exchange);
+ Assertions.assertEquals("\"\"", s.get("name").toString());
+ }
+
+ @Test
+ void shouldMaskFieldList() throws Exception {
+ Map<String, List<String>> names = new HashMap<>();
+ Exchange exchange = new DefaultExchange(camelContext);
+ List<String> els = new ArrayList<>();
+ els.add("Sheldon");
+ els.add("Rajesh");
+ els.add("Leonard");
+ names.put("names", els);
+
+ exchange.getMessage().setBody(mapper.writeValueAsString(names));
+
+ JsonNode s = processor.process("names", null, exchange);
+ Assertions.assertEquals("[]", s.get("names").toString());
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
new file mode 100644
index 00000000000..c1837a86718
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/RegexRouterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class RegexRouterTest {
+
+ private DefaultCamelContext camelContext;
+
+ private RegexRouter processor;
+
+ private final String topic = "hello";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new RegexRouter();
+ }
+
+ @Test
+ void shouldReplaceFieldToPlainJson() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setHeader(KafkaConstants.TOPIC, topic);
+
+ processor.process(".*ll.*", "newTopic", exchange);
+
+ Assertions.assertEquals("newTopic",
exchange.getMessage().getHeader(KafkaConstants.OVERRIDE_TOPIC));
+ }
+}
diff --git
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
new file mode 100644
index 00000000000..2e6e4ebfcc4
--- /dev/null
+++
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/utils/transform/ReplaceFieldTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet.utils.transform;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ReplaceFieldTest {
+
+ private DefaultCamelContext camelContext;
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private ReplaceField processor;
+
+ private final String baseJson = "{" + "\n" +
+ " \"name\" : \"Rajesh Koothrappali\"," +
"\n" +
+ " \"age\" : \"29\"" + "\n" +
+ "}";
+
+ @BeforeEach
+ void setup() {
+ camelContext = new DefaultCamelContext();
+ processor = new ReplaceField();
+ }
+
+ @Test
+ void shouldReplaceFieldToPlainJson() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode node = processor.process("all", "none",
"name:firstName,age:years", exchange);
+
+ Assertions.assertEquals(node.toString(), "{" +
+ "\"firstName\":\"Rajesh
Koothrappali\"," +
+ "\"years\":\"29\"" +
+ "}");
+ }
+
+ @Test
+ void shouldReplaceFieldWithSpecificRename() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode node = processor.process("name,age", "none",
"name:firstName", exchange);
+
+ Assertions.assertEquals(node.toString(), "{" +
+ "\"firstName\":\"Rajesh
Koothrappali\"," +
+ "\"age\":\"29\"" +
+ "}");
+ }
+
+ @Test
+ void shouldReplaceFieldWithSpecificRenameAndDisableFields() throws
Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode node = processor.process("name", "none", "name:firstName",
exchange);
+
+ Assertions.assertEquals(node.toString(), "{" +
+ "\"firstName\":\"Rajesh
Koothrappali\"" +
+ "}");
+ }
+
+ @Test
+ void shouldReplaceFieldWithSpecificDisableFields() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode node = processor.process("all", "name,age", "name:firstName",
exchange);
+
+ Assertions.assertEquals(node.toString(), "{" +
+ "\"name\":\"Rajesh
Koothrappali\"," +
+ "\"age\":\"29\"" +
+ "}");
+ }
+
+ @Test
+ void shouldReplaceFieldWithDisableAllFields() throws Exception {
+ Exchange exchange = new DefaultExchange(camelContext);
+
+ exchange.getMessage().setBody(mapper.readTree(baseJson));
+
+ JsonNode node = processor.process("none", "all", "name:firstName",
exchange);
+
+ Assertions.assertEquals(node.toString(), "{" +
+ "\"name\":\"Rajesh
Koothrappali\"," +
+ "\"age\":\"29\"" +
+ "}");
+ }
+}