This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c13dd1f849925da51b0a1a92f30cc2611888cad3
Author: Mathieu <[email protected]>
AuthorDate: Fri Jun 11 15:39:49 2021 +0200

    Add RecordToJSONTransforms and JSONToRecordTransforms transforms
---
 connectors/camel-aws2-s3-kafka-connector/pom.xml   |  5 ++
 .../aws2s3/models/StorageHeader.java               | 29 ++++++++
 .../aws2s3/models/StorageRecord.java               | 30 ++++++++
 .../transformers/JSONToRecordTransforms.java       | 72 +++++++++++++++++++
 .../transformers/RecordToJSONTransforms.java       | 82 ++++++++++++++++++++++
 5 files changed, 218 insertions(+)

diff --git a/connectors/camel-aws2-s3-kafka-connector/pom.xml 
b/connectors/camel-aws2-s3-kafka-connector/pom.xml
index 6650a70..d7318a8 100644
--- a/connectors/camel-aws2-s3-kafka-connector/pom.xml
+++ b/connectors/camel-aws2-s3-kafka-connector/pom.xml
@@ -54,6 +54,11 @@
       <artifactId>camel-jackson</artifactId>
     </dependency>
     <!--END OF GENERATED CODE-->
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.8.7</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
new file mode 100644
index 0000000..c3526dc
--- /dev/null
+++ 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.models;
+
+
+public class StorageHeader {
+  public final String key;
+  public final String value;
+
+  public StorageHeader(String key, String value) {
+    this.key = key;
+    this.value = value;
+  }
+}
+
diff --git 
a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
new file mode 100644
index 0000000..3d74597
--- /dev/null
+++ 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.models;
+
+public class StorageRecord {
+  public final String key;
+  public final String body;
+  public final StorageHeader[] headers;
+
+  public StorageRecord(String key, String body, StorageHeader[] headers) {
+    this.key = key;
+    this.body = body;
+    this.headers = headers;
+  }
+}
+
diff --git 
a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
new file mode 100644
index 0000000..2d28425
--- /dev/null
+++ 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.transformers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+
+public class JSONToRecordTransforms<R extends ConnectRecord<R>> implements 
Transformation<R> {
+  public static final String FIELD_KEY_CONFIG = "key";
+  public static final ConfigDef CONFIG_DEF =
+      new ConfigDef()
+          .define(
+              FIELD_KEY_CONFIG,
+              ConfigDef.Type.STRING,
+              null,
+              ConfigDef.Importance.MEDIUM,
+              "Add the key and the header to the record value");
+
+  @Override
+  public void configure(Map<String, ?> configs) {}
+
+  @Override
+  public R apply(R record) {
+    String str = new String((byte[]) record.value());
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    Gson gson = gsonBuilder.create();
+    StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class);
+    // Header format conversion
+    Headers headers = new ConnectHeaders();
+    for (int i = 0; i < storageRecord.headers.length; i++) {
+      headers.add(storageRecord.headers[i].key, 
storageRecord.headers[i].value, null);
+    }
+    headers.forEach(h -> record.headers().add(h));
+    return record.newRecord(
+        record.topic(),
+        record.kafkaPartition(),
+        record.keySchema(),
+        storageRecord.key,
+        record.valueSchema(),
+        storageRecord.body,
+        record.timestamp(),
+        headers);
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public ConfigDef config() {
+    return CONFIG_DEF;
+  }
+}
diff --git 
a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
new file mode 100644
index 0000000..fdb2e9f
--- /dev/null
+++ 
b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.aws2s3.transformers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.util.ArrayList;
+import java.util.Map;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader;
+import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord;
+import java.io.ByteArrayInputStream;
+
+public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements 
Transformation<R> {
+  public static final String FIELD_KEY_CONFIG = "key";
+  public static final ConfigDef CONFIG_DEF =
+      new ConfigDef()
+          .define(
+              FIELD_KEY_CONFIG,
+              ConfigDef.Type.STRING,
+              null,
+              ConfigDef.Importance.MEDIUM,
+              "Add the key and the header to the record value");
+
+  @Override
+  public void configure(Map<String, ?> configs) {}
+
+  @Override
+  public R apply(R record) {
+    // Convert headers to StorageHeader format
+    Headers headers = record.headers();
+    ArrayList<StorageHeader> headerList = new 
ArrayList<StorageHeader>(headers.size());
+    for (Header h : headers) {
+      headerList.add(new StorageHeader(h.key(), (String) h.value()));
+    }
+    StorageHeader[] storageHeaders = new StorageHeader[headers.size()];
+    StorageRecord storageRecord =
+        new StorageRecord(
+            (String) record.key(), (String) record.value(), 
headerList.toArray(storageHeaders));
+
+    // Serialize
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    Gson gson = gsonBuilder.create();
+    String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class);
+    InputStream storageRecordStream = new 
ByteArrayInputStream(storageRecordJSON.getBytes())
+    return record.newRecord(
+        record.topic(),
+        record.kafkaPartition(),
+        null,
+        record.key(),
+        Schema.STRING_SCHEMA,
+        storageRecordStream,
+        record.timestamp());
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public ConfigDef config() {
+    return CONFIG_DEF;
+  }
+}

Reply via email to