This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 8945529 [ISSUE #223]Add common transform (#224)
8945529 is described below
commit 8945529e8dad98f8a781c1bd830fad8405d48243
Author: xiaoyi <[email protected]>
AuthorDate: Wed Aug 3 20:34:56 2022 +0800
[ISSUE #223]Add common transform (#224)
* init transforms
* upgrade api to 0.1.3
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema
isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Rocketmq replicator running null pointer #205
* add common transform #223
* fixed
* move transform to root directory #223
Co-authored-by: “sunxiaojian” <“[email protected]”>
---
pom.xml | 1 +
transforms/pom.xml | 183 ++++++++++++++++++++
.../connect/transforms/BaseTransformation.java | 180 ++++++++++++++++++++
.../rocketmq/connect/transforms/ChangeCase.java | 129 ++++++++++++++
.../connect/transforms/ChangeCaseConfig.java | 54 ++++++
.../connect/transforms/ExtractNestedField.java | 122 ++++++++++++++
.../transforms/ExtractNestedFieldConfig.java | 58 +++++++
.../rocketmq/connect/transforms/PatternFilter.java | 132 +++++++++++++++
.../connect/transforms/PatternFilterConfig.java | 63 +++++++
.../rocketmq/connect/transforms/PatternRename.java | 146 ++++++++++++++++
.../connect/transforms/PatternRenameConfig.java | 82 +++++++++
.../rocketmq/connect/transforms/RegexRouter.java | 73 ++++++++
.../connect/transforms/SetMaximumPrecision.java | 186 +++++++++++++++++++++
.../transforms/SetMaximumPrecisionConfig.java | 37 ++++
.../rocketmq/connect/transforms/SetNull.java | 72 ++++++++
.../connect/transforms/util/ExtendKeyValue.java | 121 ++++++++++++++
.../connect/transforms/util/SchemaHelper.java | 74 ++++++++
.../connect/transforms/util/SchemaUtil.java | 29 ++++
.../connect/transforms/test/ChangeCaseTest.java | 85 ++++++++++
.../transforms/test/ExtractNestedFieldTest.java | 111 ++++++++++++
.../connect/transforms/test/PatternFilterTest.java | 93 +++++++++++
.../connect/transforms/test/PatternRenameTest.java | 150 +++++++++++++++++
.../transforms/test/SetMaximumPrecisionTest.java | 136 +++++++++++++++
.../connect/transforms/test/SetNullTest.java | 49 ++++++
.../transforms/test/TransformationTest.java | 41 +++++
.../transforms/test/common/AssertSchema.java | 113 +++++++++++++
.../transforms/test/common/AssertStruct.java | 115 +++++++++++++
.../transforms/test/common/GenericAssertions.java | 112 +++++++++++++
28 files changed, 2747 insertions(+)
diff --git a/pom.xml b/pom.xml
index c8983d3..26d5f2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,6 +27,7 @@
<module>rocketmq-connect-runtime</module>
<module>rocketmq-connect-cli</module>
<module>distribution</module>
+ <module>transforms</module>
</modules>
<name>RocketMQ Connect</name>
diff --git a/transforms/pom.xml b/transforms/pom.xml
new file mode 100644
index 0000000..e511f0f
--- /dev/null
+++ b/transforms/pom.xml
@@ -0,0 +1,183 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-transforms</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.36</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.1-jre</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+
<configLocation>../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
new file mode 100644
index 0000000..6b1ff8b
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import org.apache.rocketmq.connect.transforms.util.SchemaHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTransformation<R extends ConnectRecord> implements
Transform<R> {
+ private static final Logger log =
LoggerFactory.getLogger(BaseTransformation.class);
+
+ protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+ throw new UnsupportedOperationException("MAP is not a supported
type.");
+ }
+
+ protected SchemaAndValue processStruct(R record, Schema inputSchema,
Struct input) {
+ throw new UnsupportedOperationException("STRUCT is not a supported
type.");
+ }
+
+ protected SchemaAndValue processString(R record, Schema inputSchema,
String input) {
+ throw new UnsupportedOperationException("STRING is not a supported
type.");
+ }
+
+ protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[]
input) {
+ throw new UnsupportedOperationException("BYTES is not a supported
type.");
+ }
+
+ protected SchemaAndValue processInt8(R record, Schema inputSchema, byte
input) {
+ throw new UnsupportedOperationException("INT8 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processInt16(R record, Schema inputSchema, short
input) {
+ throw new UnsupportedOperationException("INT16 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processInt32(R record, Schema inputSchema, int
input) {
+ throw new UnsupportedOperationException("INT32 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processInt64(R record, Schema inputSchema, long
input) {
+ throw new UnsupportedOperationException("INT64 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processBoolean(R record, Schema inputSchema,
boolean input) {
+ throw new UnsupportedOperationException("BOOLEAN is not a supported
type.");
+ }
+
+ protected SchemaAndValue processTimestamp(R record, Schema inputSchema,
Date input) {
+ throw new UnsupportedOperationException("Timestamp is not a supported
type.");
+ }
+
+ protected SchemaAndValue processDate(R record, Schema inputSchema, Date
input) {
+ throw new UnsupportedOperationException("Date is not a supported
type.");
+ }
+
+ protected SchemaAndValue processTime(R record, Schema inputSchema, Date
input) {
+ throw new UnsupportedOperationException("Time is not a supported
type.");
+ }
+
+ protected SchemaAndValue processDecimal(R record, Schema inputSchema,
BigDecimal input) {
+ throw new UnsupportedOperationException("Decimal is not a supported
type.");
+ }
+
+ protected SchemaAndValue processFloat64(R record, Schema inputSchema,
double input) {
+ throw new UnsupportedOperationException("FLOAT64 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processFloat32(R record, Schema inputSchema,
float input) {
+ throw new UnsupportedOperationException("FLOAT32 is not a supported
type.");
+ }
+
+ protected SchemaAndValue processArray(R record, Schema inputSchema,
List<Object> input) {
+ throw new UnsupportedOperationException("ARRAY is not a supported
type.");
+ }
+
+ protected SchemaAndValue processMap(R record, Schema inputSchema,
Map<Object, Object> input) {
+ throw new UnsupportedOperationException("MAP is not a supported
type.");
+ }
+
+
+ private static final Schema OPTIONAL_TIMESTAMP =
Timestamp.builder().optional().build();
+ protected SchemaAndValue process(R record, Schema inputSchema, Object
input) {
+ final SchemaAndValue result;
+ if (null == inputSchema && null == input) {
+ return new SchemaAndValue(
+ null,
+ null
+ );
+ }
+ if (input instanceof Map) {
+ log.trace("process() - Processing as map");
+ result = processMap(record, (Map<String, Object>) input);
+ return result;
+ }
+
+ if (null == inputSchema) {
+ log.trace("process() - Determining schema");
+ inputSchema = SchemaHelper.schema(input);
+ }
+
+ String schemaName = inputSchema.getName();
+ FieldType schemaType = inputSchema.getFieldType();
+ log.trace("process() - Input has as schema. schema = {}", inputSchema);
+ if (FieldType.STRUCT == schemaType) {
+ result = processStruct(record, inputSchema, (Struct) input);
+ } else if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
+ result = processTimestamp(record, inputSchema, (Date) input);
+ } else if
(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(schemaName))
{
+ result = processDate(record, inputSchema, (Date) input);
+ } else if (Time.LOGICAL_NAME.equals(schemaName)) {
+ result = processTime(record, inputSchema, (Date) input);
+ } else if (Decimal.LOGICAL_NAME.equals(schemaName)) {
+ result = processDecimal(record, inputSchema, (BigDecimal) input);
+ } else if (FieldType.STRING == schemaType) {
+ result = processString(record, inputSchema, (String) input);
+ } else if (FieldType.BYTES == schemaType) {
+ result = processBytes(record, inputSchema, (byte[]) input);
+ } else if (FieldType.INT8 == schemaType) {
+ result = processInt8(record, inputSchema, (byte) input);
+ } else if (FieldType.INT16 == schemaType) {
+ result = processInt16(record, inputSchema, (short) input);
+ } else if (FieldType.INT32 == schemaType) {
+ result = processInt32(record, inputSchema, (int) input);
+ } else if (FieldType.INT64 == schemaType) {
+ result = processInt64(record, inputSchema, (long) input);
+ } else if (FieldType.FLOAT32 == schemaType) {
+ result = processFloat32(record, inputSchema, (float) input);
+ } else if (FieldType.FLOAT64 == schemaType) {
+ result = processFloat64(record, inputSchema, (double) input);
+ } else if (FieldType.ARRAY == schemaType) {
+ result = processArray(record, inputSchema, (List<Object>) input);
+ } else if (FieldType.MAP == schemaType) {
+ result = processMap(record, inputSchema, (Map<Object, Object>)
input);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Schema is not supported. type='%s' name='%s'",
+ schemaType,
+ schemaName
+ )
+ );
+ }
+
+ return result;
+ }
+
+ @Override
+ public void stop() {
+ // NO-op
+ }
+}
\ No newline at end of file
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
new file mode 100644
index 0000000..b8094d2
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * change case
+ * @param <R>
+ */
+public abstract class ChangeCase<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
+
+ class State {
+ public final Map<String, String> columnMapping;
+ public final Schema schema;
+ State(Map<String, String> columnMapping, Schema schema) {
+ this.columnMapping = columnMapping;
+ this.schema = schema;
+ }
+ }
+
+ Map<Schema, State> schemaState = new HashMap<>();
+ private ChangeCaseConfig config;
+ @Override
+ public void start(KeyValue config) {
+ this.config = new ChangeCaseConfig(config);
+ }
+
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
+ // state
+ final State state = this.schemaState.computeIfAbsent(inputSchema, schema
-> {
+ final SchemaBuilder builder = SchemaBuilder.struct();
+ if (!Strings.isNullOrEmpty(schema.getName())) {
+ builder.name(schema.getName());
+ }
+ if (schema.isOptional()) {
+ builder.optional();
+ }
+ final Map<String, String> columnMapping = new LinkedHashMap<>();
+ for (Field field : schema.getFields()) {
+ final String newFieldName = this.config.from.to(this.config.to,
field.getName());
+ log.trace("processStruct() - Mapped '{}' to '{}'", field.getName(),
newFieldName);
+ columnMapping.put(field.getName(), newFieldName);
+ builder.field(newFieldName, field.getSchema());
+ }
+ return new State(columnMapping, builder.build());
+ });
+
+
+ final Struct outputStruct = new Struct(state.schema);
+ for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
+ final Object value = input.get(kvp.getKey());
+ outputStruct.put(kvp.getValue(), value);
+ }
+ return new SchemaAndValue(state.schema, outputStruct);
+ }
+
+ /**
+ * transform key
+ */
+ public static class Key extends ChangeCase<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+
+ /**
+ * transform value
+ */
+ public static class Value extends ChangeCase<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
new file mode 100644
index 0000000..fa9a5c9
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+
+import com.google.common.base.CaseFormat;
+import io.openmessaging.KeyValue;
+
+public class ChangeCaseConfig {
+ public final CaseFormat from;
+ public final CaseFormat to;
+
+ public static final String FROM_CONFIG = "from";
+ static final String FROM_DOC = "The format to move from ";
+ public static final String TO_CONFIG = "to";
+ static final String TO_DOC = "";
+
+ public ChangeCaseConfig(KeyValue config) {
+ String fromConfig = config.getString(FROM_CONFIG);
+ this.from = CaseFormat.valueOf(fromConfig);
+ String toConfig = config.getString(TO_CONFIG);
+ this.to = CaseFormat.valueOf(toConfig);
+ }
+
+ /**
+ * from
+ * @return
+ */
+ public CaseFormat from(){
+ return this.from;
+ }
+
+ /**
+ * to
+ * @return
+ */
+ public CaseFormat to(){
+ return this.to;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
new file mode 100644
index 0000000..122951d
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * extract nested field
+ * @param <R>
+ */
+public abstract class ExtractNestedField<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger log =
LoggerFactory.getLogger(ExtractNestedField.class);
+ private ExtractNestedFieldConfig config;
+ Map<Schema, Schema> schemaCache;
+ @Override
+ public void start(KeyValue keyValue) {
+ this.config = new ExtractNestedFieldConfig(keyValue);
+ this.schemaCache = new HashMap<>();
+ }
+
+
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
+ final Struct innerStruct = input.getStruct(this.config.outerFieldName);
+ final Schema outputSchema = this.schemaCache.computeIfAbsent(inputSchema,
s -> {
+
+ final Field innerField =
innerStruct.schema().getField(this.config.innerFieldName);
+ final SchemaBuilder builder = SchemaBuilder.struct();
+ if (!Strings.isNullOrEmpty(inputSchema.getName())) {
+ builder.name(inputSchema.getName());
+ }
+ if (inputSchema.isOptional()) {
+ builder.optional();
+ }
+ for (Field inputField : inputSchema.getFields()) {
+ builder.field(inputField.getName(), inputField.getSchema());
+ }
+ builder.field(this.config.innerFieldName, innerField.getSchema());
+ return builder.build();
+ });
+ final Struct outputStruct = new Struct(outputSchema);
+ for (Field inputField : inputSchema.getFields()) {
+ final Object value = input.get(inputField);
+ outputStruct.put(inputField.getName(), value);
+ }
+ final Object innerFieldValue = innerStruct.get(this.config.innerFieldName);
+ outputStruct.put(this.config.innerFieldName, innerFieldValue);
+
+ return new SchemaAndValue(outputSchema, outputStruct);
+
+ }
+
+ /**
+ * transform key
+ */
+ public static class Key extends ExtractNestedField<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+
+ /**
+ * transform value
+ */
+ public static class Value extends ExtractNestedField<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
new file mode 100644
index 0000000..9cd2f23
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+
+
+/**
+ * extract nested field config
+ */
+public class ExtractNestedFieldConfig{
+ public final String outerFieldName;
+ public final String innerFieldName;
+ public final String outputFieldName;
+
+ public ExtractNestedFieldConfig(KeyValue config) {
+ this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
+ this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
+ this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
+ }
+
+ public static final String OUTER_FIELD_NAME_CONF = "input.outer.field.name";
+ static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct
containing the child struct. " +
+ "For example if you wanted the extract `address.state` you would use
`address`.";
+ public static final String INNER_FIELD_NAME_CONF = "input.inner.field.name";
+ static final String INNER_FIELD_NAME_DOC = "The field on the child struct
containing the field to be extracted. " +
+ "For example if you wanted the extract `address.state` you would use
`state`.";
+ public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
+ static final String OUTPUT_FIELD_NAME_DOC = "The field to place the
extracted value into.";
+
+
+ public String outerFieldName(){
+ return this.outerFieldName;
+ }
+
+ public String innerFieldName(){
+ return this.innerFieldName;
+ }
+
+ public String outputFieldName(){
+ return this.outerFieldName;
+ }
+
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
new file mode 100644
index 0000000..e2a71ff
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * pattern filter
+ * @param <R>
+ */
+public abstract class PatternFilter<R extends ConnectRecord> extends
BaseTransformation<R> {
+
+ private static final Logger log =
LoggerFactory.getLogger(PatternFilter.class);
+ public Pattern pattern;
+ public Set<String> fields;
+
+ private PatternFilterConfig config;
+ @Override
+ public void start(KeyValue config) {
+ this.config = new PatternFilterConfig(config);
+ this.pattern = this.config.pattern();
+ this.fields = this.config.fields();
+ }
+
+
+ R filter(R record, Struct struct) {
+ for (Field field : struct.schema().getFields()) {
+ if (!this.fields.contains(field.getName()) ||
field.getSchema().getFieldType() != FieldType.STRING) {
+ continue;
+ }
+ String input = struct.getString(field.getName());
+ if (null != input) {
+ if (this.pattern.matcher(input).matches()) {
+ return null;
+ }
+ }
+ }
+ return record;
+ }
+
+ /**
+ * filter map
+ * @param record
+ * @param map
+ * @return
+ */
+ R filter(R record, Map map) {
+ for (Object field : map.keySet()) {
+ if (!this.fields.contains(field)) {
+ continue;
+ }
+ Object value = map.get(field);
+ if (value instanceof String) {
+ String input = (String) value;
+ if (this.pattern.matcher(input).matches()) {
+ return null;
+ }
+ }
+ }
+ return record;
+ }
+
+
+ R filter(R record, final boolean key) {
+ final SchemaAndValue input = key ?
+ new SchemaAndValue(record.getKeySchema(), record.getKey()) :
+ new SchemaAndValue(record.getSchema(), record.getData());
+ final R result;
+ if (input.schema() != null) {
+ if (FieldType.STRUCT == input.schema().getFieldType()) {
+ result = filter(record, (Struct) input.value());
+ } else if (FieldType.MAP == input.schema().getFieldType()) {
+ result = filter(record, (Map) input.value());
+ } else {
+ result = record;
+ }
+ } else if (input.value() instanceof Map) {
+ result = filter(record, (Map) input.value());
+ } else {
+ result = record;
+ }
+ return result;
+ }
+
+
+ /**
+ * filter key
+ * @param <R>
+ */
+ public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
+ @Override
+ public R doTransform(R r) {
+ return filter(r, true);
+ }
+ }
+
+ /**
+ * filter value
+ * @param <R>
+ */
+ public static class Value<R extends ConnectRecord> extends PatternFilter<R> {
+ @Override
+ public R doTransform(R r) {
+ return filter(r, false);
+ }
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
new file mode 100644
index 0000000..543ddf7
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.transforms.util.ExtendKeyValue;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+
+/**
+ * pattern filter config
+ */
+public class PatternFilterConfig{
+
+ public static final String PATTERN_CONFIG = "pattern";
+ public static final String PATTERN_DOC = "The regex to test the message
with. ";
+
+ public static final String FIELD_CONFIG = "fields";
+ public static final String FIELD_DOC = "The fields to transform.";
+
+
+ private final Pattern pattern;
+ private final Set<String> fields;
+ public PatternFilterConfig(KeyValue config){
+ ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
+ String pattern = extendKeyValue.getString(PATTERN_CONFIG);
+ try {
+ this.pattern = Pattern.compile(pattern);
+ } catch (PatternSyntaxException var4) {
+ throw new ConnectException(String.format("Could not compile regex
'%s'.", pattern));
+ }
+ List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
+ this.fields = new HashSet<>(fields);
+ }
+
+ public Pattern pattern() {
+ return this.pattern;
+ }
+
+ public Set<String> fields() {
+ return this.fields;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
new file mode 100644
index 0000000..d27e330
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+/**
+ * pattern rename
+ * @param <R>
+ */
+public abstract class PatternRename<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger log =
LoggerFactory.getLogger(PatternRename.class);
+ PatternRenameConfig config;
+ @Override
+ public void start(KeyValue keyValue) {
+ config = new PatternRenameConfig(keyValue);
+ }
+
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
inputStruct) {
+ final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
+ outputSchemaBuilder.name(inputSchema.getName());
+ outputSchemaBuilder.doc(inputSchema.getDoc());
+ if (null != inputSchema.getDefaultValue()) {
+ outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
+ }
+ if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
+ outputSchemaBuilder.parameters(inputSchema.getParameters());
+ }
+ if (inputSchema.isOptional()) {
+ outputSchemaBuilder.optional();
+ }
+ Map<String, String> fieldMappings = new
HashMap<>(inputSchema.getFields().size());
+ for (final Field inputField : inputSchema.getFields()) {
+ log.trace("process() - Processing field '{}'", inputField.getName());
+ final Matcher fieldMatcher =
this.config.pattern.matcher(inputField.getName());
+ final String outputFieldName;
+ if (fieldMatcher.find()) {
+ // replace
+ outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
+ } else {
+ outputFieldName = inputField.getName();
+ }
+ log.trace("process() - Mapping field '{}' to '{}'",
inputField.getName(), outputFieldName);
+ fieldMappings.put(inputField.getName(), outputFieldName);
+ outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
+ }
+ final Schema outputSchema = outputSchemaBuilder.build();
+ final Struct outputStruct = new Struct(outputSchema);
+ for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
+ final String inputField = entry.getKey(), outputField = entry.getValue();
+ log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
+ final Object value = inputStruct.get(inputField);
+ outputStruct.put(outputField, value);
+ }
+ return new SchemaAndValue(outputSchema, outputStruct);
+ }
+
+ @Override
+ protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+ final Map<String, Object> outputMap = new LinkedHashMap<>(input.size());
+ for (final String inputFieldName : input.keySet()) {
+ log.trace("process() - Processing field '{}'", inputFieldName);
+ final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName);
+ final String outputFieldName;
+ // replace
+ if (fieldMatcher.find()) {
+ outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
+ } else {
+ outputFieldName = inputFieldName;
+ }
+ final Object value = input.get(inputFieldName);
+ outputMap.put(outputFieldName, value);
+ }
+ return new SchemaAndValue(null, outputMap);
+ }
+
+ /**
+ * transform key
+ */
+ public static class Key extends PatternRename<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+ /**
+ * transform value
+ */
+ public static class Value extends PatternRename<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ final SchemaAndValue transformed = process(r, r.getSchema(),
r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
new file mode 100644
index 0000000..b7c0e9e
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.transforms.util.ExtendKeyValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * pattern rename config
+ */
+public class PatternRenameConfig {
+
+ public static final String FIELD_PATTERN_CONF = "field.pattern";
+ static final String FIELD_PATTERN_DOC = "";
+
+ public static final String FIELD_PATTERN_FLAGS_CONF = "field.pattern.flags";
+ static final String FIELD_PATTERN_FLAGS_DOC = "";
+
+ public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
+ static final String FIELD_REPLACEMENT_DOC = "";
+
+ static final Map<String, Integer> FLAG_VALUES;
+
+ static {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
+ map.put("CANON_EQ", Pattern.CANON_EQ);
+ map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
+ map.put("DOTALL", Pattern.DOTALL);
+ map.put("LITERAL", Pattern.LITERAL);
+ map.put("MULTILINE", Pattern.MULTILINE);
+ map.put("COMMENTS", Pattern.COMMENTS);
+ map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
+ map.put("UNIX_LINES", Pattern.UNIX_LINES);
+ FLAG_VALUES = ImmutableMap.copyOf(map);
+ }
+
+ public final Pattern pattern;
+ public final String replacement;
+
+ public PatternRenameConfig(KeyValue config) {
+ ExtendKeyValue extendConfig = new ExtendKeyValue(config);
+ final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
+ final List<String> flagList =
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
+ int patternFlags = 0;
+ for (final String f : flagList) {
+ final int flag = FLAG_VALUES.get(f);
+ patternFlags = patternFlags | flag;
+ }
+ this.pattern = Pattern.compile(pattern, patternFlags);
+ this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
+ }
+
+
+ public Pattern pattern() {
+ return pattern;
+ }
+
+ public String replacement() {
+ return replacement;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
new file mode 100644
index 0000000..ff4b94f
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * regex router
+ * @param <R>
+ */
+public abstract class RegexRouter<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RegexRouter.class);
+ public static final String TOPIC = "topic";
+ public static final String OVERVIEW_DOC = "Update the record topic using
the configured regular expression and replacement string."
+ + "<p/>Under the hood, the regex is compiled to a
<code>java.util.regex.Pattern</code>. "
+ + "If the pattern matches the input topic,
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the
replacement string to obtain the new topic.";
+
+ private interface ConfigName {
+ String REGEX = "regex";
+ String REPLACEMENT = "replacement";
+ }
+
+ private Pattern regex;
+ private String replacement;
+
+ @Override
+ public void start(KeyValue config) {
+ regex = Pattern.compile(config.getString(ConfigName.REGEX));
+ replacement = config.getString(ConfigName.REPLACEMENT);
+ }
+
+ @Override
+ public R doTransform(R record) {
+ Map<String, Object> partitionMap = (Map<String,
Object>)record.getPosition().getPartition().getPartition();
+ if (null == partitionMap || !partitionMap.containsKey(TOPIC)) {
+ LOG.warn("PartitionMap get topic is null , lack of topic config");
+ return record;
+ }
+ Object o = partitionMap.get(TOPIC);
+ if (null == o) {
+ LOG.warn("PartitionMap get topic is null , lack of topic config");
+ return record;
+
+ }
+ final Matcher matcher = regex.matcher(o.toString());
+ if (matcher.matches()) {
+ String topic = matcher.replaceFirst(replacement);
+ partitionMap.put(TOPIC, topic);
+ }
+ return record;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
new file mode 100644
index 0000000..3ba40c6
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * set maximum precision
+ * @param <R>
+ */
+public abstract class SetMaximumPrecision<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger log =
LoggerFactory.getLogger(SetMaximumPrecision.class);
+
+ SetMaximumPrecisionConfig config;
+
+ @Override
+ public void start(KeyValue keyValue) {
+ config = new SetMaximumPrecisionConfig(keyValue);
+ }
+
+ static final State NOOP = new State(true, null, null);
+
+ static class State {
+ public final boolean noop;
+ public final Schema outputSchema;
+ public final Set<String> decimalFields;
+
+ State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
+ this.noop = noop;
+ this.outputSchema = outputSchema;
+ this.decimalFields = decimalFields;
+ }
+ }
+
+ Map<Schema, State> schemaLookup = new HashMap<>();
+
+ public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP =
"connect.decimal.precision";
+
+ State state(Schema inputSchema) {
+ return this.schemaLookup.computeIfAbsent(inputSchema, new Function<Schema,
State>() {
+ @Override
+ public State apply(Schema schema) {
+ Set<String> decimalFields = inputSchema.getFields().stream()
+ .filter(f -> Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
+ .filter(f ->
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
"64")) > config.maxPrecision())
+ .map(Field::getName)
+ .collect(Collectors.toSet());
+ State result;
+
+ if (decimalFields.size() == 0) {
+ result = NOOP;
+ } else {
+ log.trace("state() - processing schema '{}'", schema.getName());
+ SchemaBuilder builder = SchemaBuilder.struct()
+ .name(inputSchema.getName())
+ .doc(inputSchema.getDoc())
+ .version(inputSchema.getVersion());
+ if (null != inputSchema.getParameters() &&
!inputSchema.getParameters().isEmpty()) {
+ builder.parameters(inputSchema.getParameters());
+ }
+
+ for (Field field : inputSchema.getFields()) {
+ log.trace("state() - processing field '{}'", field.getName());
+ if (decimalFields.contains(field.getName())) {
+ Map<String, String> parameters = new LinkedHashMap<>();
+ if (null != field.getSchema().getParameters() &&
!field.getSchema().getParameters().isEmpty()) {
+ parameters.putAll(field.getSchema().getParameters());
+ }
+ parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
Integer.toString(config.maxPrecision()));
+ int scale =
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
+ SchemaBuilder fieldBuilder = Decimal.builder(scale)
+ .parameters(parameters)
+ .doc(field.getSchema().getDoc())
+ .version(field.getSchema().getVersion());
+ if (field.getSchema().isOptional()) {
+ fieldBuilder.optional();
+ }
+ Schema fieldSchema = fieldBuilder.build();
+ builder.field(field.getName(), fieldSchema);
+ } else {
+ log.trace("state() - copying field '{}' to new schema.",
field.getName());
+ builder.field(field.getName(), field.getSchema());
+ }
+ }
+
+ Schema outputSchema = builder.build();
+ result = new State(false, outputSchema, decimalFields);
+ }
+
+ return result;
+ }
+ });
+
+ }
+
+ @Override
+ protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct
input) {
+ State state = state(inputSchema);
+ SchemaAndValue result;
+
+ if (state.noop) {
+ result = new SchemaAndValue(inputSchema, input);
+ } else {
+ Struct struct = new Struct(state.outputSchema);
+ for (Field field : inputSchema.getFields()) {
+ struct.put(field.getName(), input.get(field.getName()));
+ }
+ result = new SchemaAndValue(state.outputSchema, struct);
+ }
+ return result;
+ }
+
+ /**
+ * transform key
+ */
+ public static class Key extends SetMaximumPrecision<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ SchemaAndValue transformed = this.process(r, r.getKeySchema(),
r.getKey());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ transformed.schema(),
+ transformed.value(),
+ r.getSchema(),
+ r.getData()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+
+ /**
+ * transform value
+ */
+ public static class Value extends SetMaximumPrecision<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ SchemaAndValue transformed = this.process(r, r.getSchema(), r.getData());
+ ConnectRecord record = new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ transformed.schema(),
+ transformed.value()
+ );
+ record.setExtensions(r.getExtensions());
+ return record;
+ }
+ }
+}
+
+
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
new file mode 100644
index 0000000..9a801c3
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * set maximum precision config
+ */
+public class SetMaximumPrecisionConfig {
+ public static final String MAX_PRECISION_CONFIG = "precision.max";
+ static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
+
+ private final int maxPrecision;
+
+ public SetMaximumPrecisionConfig(KeyValue config) {
+ this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
+ }
+
+ public int maxPrecision(){
+ return maxPrecision;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
new file mode 100644
index 0000000..54ed09d
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * set null
+ * @param <R>
+ */
+public abstract class SetNull<R extends ConnectRecord> extends
BaseTransformation<R> {
+ private static final Logger log = LoggerFactory.getLogger(SetNull.class);
+
+
+ PatternRenameConfig config;
+
+ @Override
+ public void start(KeyValue keyValue) {
+ config = new PatternRenameConfig(keyValue);
+ }
+
+ /**
+ * transform key
+ */
+ public static class Key extends SetNull<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ return new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ null,
+ null,
+ r.getSchema(),
+ r.getData()
+ );
+ }
+ }
+
+
+ public static class Value extends SetNull<ConnectRecord> {
+ @Override
+ public ConnectRecord doTransform(ConnectRecord r) {
+ return new ConnectRecord(
+ r.getPosition().getPartition(),
+ r.getPosition().getOffset(),
+ r.getTimestamp(),
+ r.getKeySchema(),
+ r.getKey(),
+ r.getSchema(),
+ r.getData()
+ );
+ }
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
new file mode 100644
index 0000000..9297718
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
@@ -0,0 +1,121 @@
+package org.apache.rocketmq.connect.transforms.util;
+
+import io.openmessaging.KeyValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * extend key value
+ */
+public class ExtendKeyValue implements KeyValue {
+ private static final Pattern COMMA_WITH_WHITESPACE =
Pattern.compile("\\s*,\\s*");
+
+
+ private KeyValue config;
+ public ExtendKeyValue(KeyValue config){
+ this.config = config;
+ }
+
+ @Override
+ public KeyValue put(String s, int i) {
+ return config.put(s, i);
+ }
+
+ @Override
+ public KeyValue put(String s, long l) {
+ return config.put(s, l);
+ }
+
+ @Override
+ public KeyValue put(String s, double v) {
+ return config.put(s, v);
+ }
+
+ @Override
+ public KeyValue put(String s, String s1) {
+ return config.put(s, s1);
+ }
+
+ @Override
+ public int getInt(String s) {
+ return config.getInt(s);
+ }
+
+ @Override
+ public int getInt(String s, int i) {
+ return config.getInt(s, i);
+ }
+
+ @Override
+ public long getLong(String s) {
+ return config.getLong(s);
+ }
+
+ @Override
+ public long getLong(String s, long l) {
+ return config.getLong(s, l);
+ }
+
+ @Override
+ public double getDouble(String s) {
+ return config.getDouble(s);
+ }
+
+ @Override
+ public double getDouble(String s, double v) {
+ return config.getDouble(s, v);
+ }
+
+ @Override
+ public String getString(String s) {
+ return config.getString(s);
+ }
+
+ @Override
+ public String getString(String s, String s1) {
+ return config.getString(s, s1);
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return config.keySet();
+ }
+
+ @Override
+ public boolean containsKey(String s) {
+ return config.containsKey(s);
+ }
+
+ /**
+ * get list
+ * @param s
+ * @return
+ */
+ public List getList(String s){
+ if (!this.config.containsKey(s)){
+ return new ArrayList();
+ }
+ String config = this.config.getString(s).trim();
+ return Arrays.asList(COMMA_WITH_WHITESPACE.split(config, -1));
+ }
+
+ /**
+ * get list by class
+ * @param s
+ * @param clazz
+ * @param <T>
+ * @return
+ */
+ public <T> List<T> getList(String s, Class<T> clazz) {
+ List configs = getList(s);
+ List<T> castConfigs = new ArrayList<>();
+ configs.forEach(config ->{
+ castConfigs.add(clazz.cast(config));
+ });
+ return castConfigs;
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
new file mode 100644
index 0000000..57c38c9
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.transforms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaHelper {
+ static final Map<Class<?>, FieldType> PRIMITIVES;
+
+ public SchemaHelper() {
+ }
+
+ public static Schema schema(Object input) {
+ return builder(input).build();
+ }
+
+ public static SchemaBuilder builder(Object input) {
+ Preconditions.checkNotNull(input, "input cannot be null.");
+ SchemaBuilder builder;
+ if (PRIMITIVES.containsKey(input.getClass())) {
+ FieldType type = PRIMITIVES.get(input.getClass());
+ builder = new SchemaBuilder(type);
+ } else if (input instanceof Date) {
+ builder = Timestamp.builder();
+ } else {
+ if (!(input instanceof BigDecimal)) {
+ throw new
UnsupportedOperationException(String.format("Unsupported Type: %s",
input.getClass()));
+ }
+ builder = Decimal.builder(((BigDecimal)input).scale());
+ }
+
+ return builder.optional();
+ }
+
+ static {
+ Map<Class<?>, FieldType> primitives = new HashMap();
+ primitives.put(String.class, FieldType.STRING);
+ primitives.put(Boolean.class, FieldType.BOOLEAN);
+ primitives.put(Byte.class, FieldType.INT8);
+ primitives.put(Short.class, FieldType.INT16);
+ primitives.put(Integer.class, FieldType.INT32);
+ primitives.put(Long.class, FieldType.INT64);
+ primitives.put(Float.class, FieldType.FLOAT32);
+ primitives.put(Double.class, FieldType.FLOAT64);
+ primitives.put(byte[].class, FieldType.BYTES);
+ PRIMITIVES = ImmutableMap.copyOf(primitives);
+ }
+}
diff --git
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
new file mode 100644
index 0000000..2317a09
--- /dev/null
+++
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
@@ -0,0 +1,29 @@
+package org.apache.rocketmq.connect.transforms.util;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+
+/**
+ * schema util
+ */
+public class SchemaUtil {
+ public static Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+ public static Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+ public static Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+ public static Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+ public static Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
+ public static Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
+ public static Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+ public static Schema STRING_SCHEMA = SchemaBuilder.string().build();
+ public static Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+
+ public static Schema OPTIONAL_INT8_SCHEMA =
SchemaBuilder.int8().optional().build();
+ public static Schema OPTIONAL_INT16_SCHEMA =
SchemaBuilder.int16().optional().build();
+ public static Schema OPTIONAL_INT32_SCHEMA =
SchemaBuilder.int32().optional().build();
+ public static Schema OPTIONAL_INT64_SCHEMA =
SchemaBuilder.int64().optional().build();
+ public static Schema OPTIONAL_FLOAT32_SCHEMA =
SchemaBuilder.float32().optional().build();
+ public static Schema OPTIONAL_FLOAT64_SCHEMA =
SchemaBuilder.float64().optional().build();
+ public static Schema OPTIONAL_BOOLEAN_SCHEMA =
SchemaBuilder.bool().optional().build();
+ public static Schema OPTIONAL_STRING_SCHEMA =
SchemaBuilder.string().optional().build();
+ public static Schema OPTIONAL_BYTES_SCHEMA =
SchemaBuilder.bytes().optional().build();
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
new file mode 100644
index 0000000..2e12eeb
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.base.CaseFormat;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.ChangeCase;
+import org.apache.rocketmq.connect.transforms.ChangeCaseConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.HashMap;
+
+public class ChangeCaseTest extends TransformationTest {
+ public ChangeCaseTest() {
+ super(false);
+ }
+
+
+ @Override
+ protected Transform<ConnectRecord> create() {
+ ChangeCase.Value value = new ChangeCase.Value();
+ return value;
+ }
+
+ @Test
+ public void test() {
+
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(ChangeCaseConfig.FROM_CONFIG,
CaseFormat.UPPER_UNDERSCORE.toString());
+ defaultKeyValue.put(ChangeCaseConfig.TO_CONFIG,
CaseFormat.LOWER_UNDERSCORE.toString());
+ this.transformation.start(defaultKeyValue);
+ final Schema inputSchema = SchemaBuilder.struct()
+ .field("FIRST_NAME", SchemaUtil.STRING_SCHEMA)
+ .field("LAST_NAME", SchemaUtil.STRING_SCHEMA)
+ .build();
+ final Schema expectedSchema = SchemaBuilder.struct()
+ .field("first_name", SchemaUtil.STRING_SCHEMA)
+ .field("last_name", SchemaUtil.STRING_SCHEMA)
+ .build();
+ final Struct inputStruct = new Struct(inputSchema)
+ .put("FIRST_NAME", "test")
+ .put("LAST_NAME", "user");
+ final Struct expectedStruct = new Struct(expectedSchema)
+ .put("first_name", "test")
+ .put("last_name", "user");
+
+ final ConnectRecord inputRecord = new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ null,
+ null,
+ inputSchema,
+ inputStruct
+ );
+ for (int i = 0; i < 50; i++) {
+ final ConnectRecord transformedRecord =
this.transformation.doTransform(inputRecord);
+ Assertions.assertNotNull(transformedRecord, "transformedRecord should
not be null.");
+ }
+ }
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
new file mode 100644
index 0000000..8cc0cda
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.ExtractNestedField;
+import org.apache.rocketmq.connect.transforms.ExtractNestedFieldConfig;
+import org.apache.rocketmq.connect.transforms.test.common.AssertStruct;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static
org.apache.rocketmq.connect.transforms.test.common.AssertSchema.assertSchema;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public abstract class ExtractNestedFieldTest extends TransformationTest {
+ protected ExtractNestedFieldTest(boolean isKey) {
+ super(isKey);
+ }
+
+ @Test
+ public void test() {
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(ExtractNestedFieldConfig.INNER_FIELD_NAME_CONF,
"state");
+ defaultKeyValue.put(ExtractNestedFieldConfig.OUTER_FIELD_NAME_CONF,
"address");
+ defaultKeyValue.put(ExtractNestedFieldConfig.OUTPUT_FIELD_NAME_CONF,
"state");
+
+ this.transformation.start(defaultKeyValue);
+
+ final Schema innerSchema = SchemaBuilder.struct()
+ .name("Address")
+ .field("city", SchemaUtil.STRING_SCHEMA)
+ .field("state", SchemaUtil.STRING_SCHEMA)
+ .build();
+ final Schema inputSchema = SchemaBuilder.struct()
+ .field("first_name", SchemaUtil.STRING_SCHEMA)
+ .field("last_name", SchemaUtil.STRING_SCHEMA)
+ .field("address", innerSchema)
+ .build();
+ final Schema expectedSchema = SchemaBuilder.struct()
+ .field("first_name", SchemaUtil.STRING_SCHEMA)
+ .field("last_name", SchemaUtil.STRING_SCHEMA)
+ .field("address", innerSchema)
+ .field("state", SchemaUtil.STRING_SCHEMA)
+ .build();
+ final Struct innerStruct = new Struct(innerSchema)
+ .put("city", "Austin")
+ .put("state", "tx");
+ final Struct inputStruct = new Struct(inputSchema)
+ .put("first_name", "test")
+ .put("last_name", "developer")
+ .put("address", innerStruct);
+ final Struct expectedStruct = new Struct(expectedSchema)
+ .put("first_name", "test")
+ .put("last_name", "developer")
+ .put("address", innerStruct)
+ .put("state", "tx");
+
+ final ConnectRecord inputRecord = new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ null,
+ null,
+ inputSchema,
+ inputStruct
+ );
+ for (int i = 0; i < 50; i++) {
+ final ConnectRecord transformedRecord =
this.transformation.doTransform(inputRecord);
+ assertNotNull(transformedRecord, "transformedRecord should not be
null.");
+ assertSchema(expectedSchema, transformedRecord.getSchema());
+ AssertStruct.assertStruct(expectedStruct, (Struct)
transformedRecord.getData());
+ }
+ }
+
+
+ public static class ValueTest extends ExtractNestedFieldTest {
+ protected ValueTest() {
+ super(false);
+ }
+
+ @Override
+ protected Transform<ConnectRecord> create() {
+ return new ExtractNestedField.Value();
+ }
+ }
+
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
new file mode 100644
index 0000000..5c207f5
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.PatternFilter;
+import org.apache.rocketmq.connect.transforms.PatternFilterConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class PatternFilterTest {
+ public PatternFilter.Value transform;
+
+ @BeforeEach
+ public void before() {
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(PatternFilterConfig.FIELD_CONFIG, "input");
+ defaultKeyValue.put(PatternFilterConfig.PATTERN_CONFIG, "^filter$");
+
+ this.transform = new PatternFilter.Value();
+ this.transform.start(defaultKeyValue);
+ }
+
+ ConnectRecord map(String value) {
+ return new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ null,
+ null,
+ null,
+ ImmutableMap.of("input", value)
+ );
+ }
+
+ ConnectRecord struct(String value) {
+ Schema schema = SchemaBuilder.struct()
+ .field("input", SchemaUtil.STRING_SCHEMA)
+ .build();
+ Struct struct = new Struct(schema)
+ .put("input", value);
+ return new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ null,
+ null,
+ schema,
+ struct
+ );
+ }
+
+ @Test
+ public void filtered() {
+ assertNull(this.transform.doTransform(struct("filter")));
+ assertNull(this.transform.doTransform(map("filter")));
+ }
+
+ @Test
+ public void notFiltered() {
+ assertNotNull(this.transform.doTransform(struct("ok")));
+ assertNotNull(this.transform.doTransform(map("ok")));
+ }
+
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
new file mode 100644
index 0000000..330b658
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.PatternRename;
+import org.apache.rocketmq.connect.transforms.PatternRenameConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.rocketmq.connect.transforms.test.common.AssertSchema.assertSchema;
+import static
org.apache.rocketmq.connect.transforms.test.common.AssertStruct.assertStruct;
+import static org.junit.Assert.assertNotNull;
+
+public abstract class PatternRenameTest extends TransformationTest {
+ final static String TOPIC = "test";
+ protected PatternRenameTest(boolean isKey) {
+ super(isKey);
+ }
+
+ @Test
+ public void schemaLess() {
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(PatternRenameConfig.FIELD_PATTERN_CONF, "\\.");
+ defaultKeyValue.put(PatternRenameConfig.FIELD_REPLACEMENT_CONF, "_");
+ this.transformation.start(defaultKeyValue);
+ final Map<String, Object> input = ImmutableMap.of(
+ "first.name", "example",
+ "last.name", "user"
+ );
+ final Map<String, Object> expected = ImmutableMap.of(
+ "first_name", "example",
+ "last_name", "user"
+ );
+
+ final Object key = isKey ? input : null;
+ final Object value = isKey ? null : input;
+ final Schema keySchema = null;
+ final Schema valueSchema = null;
+
+ final ConnectRecord inputRecord = new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ keySchema,
+ key,
+ valueSchema,
+ value
+ );
+ final ConnectRecord outputRecord =
this.transformation.doTransform(inputRecord);
+ assertNotNull(outputRecord);
+ }
+
+ @Test
+ public void prefixed() {
+
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(PatternRenameConfig.FIELD_PATTERN_CONF, "^prefixed");
+ defaultKeyValue.put(PatternRenameConfig.FIELD_REPLACEMENT_CONF, "");
+ this.transformation.start(defaultKeyValue);
+
+ Schema inputSchema = SchemaBuilder.struct()
+ .name("testing")
+ .field("prefixedfirstname", SchemaUtil.STRING_SCHEMA)
+ .field("prefixedlastname", SchemaUtil.STRING_SCHEMA)
+ .build();
+ Struct inputStruct = new Struct(inputSchema)
+ .put("prefixedfirstname", "example")
+ .put("prefixedlastname", "user");
+
+ final Object key = isKey ? inputStruct : null;
+ final Object value = isKey ? null : inputStruct;
+ final Schema keySchema = isKey ? inputSchema : null;
+ final Schema valueSchema = isKey ? null : inputSchema;
+
+ final ConnectRecord inputRecord = new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ keySchema,
+ key,
+ valueSchema,
+ value
+ );
+ final ConnectRecord outputRecord =
this.transformation.doTransform(inputRecord);
+ assertNotNull(outputRecord);
+
+ final Schema actualSchema = isKey ? outputRecord.getKeySchema() :
outputRecord.getSchema();
+ final Struct actualStruct = (Struct) (isKey ? outputRecord.getKey() :
outputRecord.getData());
+
+ final Schema expectedSchema = SchemaBuilder.struct()
+ .name("testing")
+ .field("firstname", SchemaUtil.STRING_SCHEMA)
+ .field("lastname", SchemaUtil.STRING_SCHEMA).build();
+ Struct expectedStruct = new Struct(expectedSchema)
+ .put("firstname", "example")
+ .put("lastname", "user");
+
+ assertSchema(expectedSchema, actualSchema);
+ assertStruct(expectedStruct, actualStruct);
+ }
+
+ public static class KeyTest extends PatternRenameTest {
+ public KeyTest() {
+ super(true);
+ }
+
+ @Override
+ protected Transform<ConnectRecord> create() {
+ return new PatternRename.Key();
+ }
+ }
+
+ public static class ValueTest extends PatternRenameTest {
+ public ValueTest() {
+ super(false);
+ }
+
+ @Override
+ protected Transform<ConnectRecord> create() {
+ return new PatternRename.Value();
+ }
+ }
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
new file mode 100644
index 0000000..0cbd658
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.SetMaximumPrecision;
+import org.apache.rocketmq.connect.transforms.SetMaximumPrecisionConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+
+import static
org.apache.rocketmq.connect.transforms.test.common.AssertStruct.assertStruct;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class SetMaximumPrecisionTest {
+ ConnectRecord record(Struct struct) {
+ return new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ null,
+ null,
+ struct.schema(),
+ struct
+ );
+ }
+
+ @Test
+ public void noop() {
+ Schema schema = SchemaBuilder.struct()
+ .field("first", SchemaUtil.STRING_SCHEMA)
+ .field("last", SchemaUtil.STRING_SCHEMA)
+ .field("email", SchemaUtil.STRING_SCHEMA)
+ .build();
+ Struct struct = new Struct(schema)
+ .put("first", "test")
+ .put("last", "user")
+ .put("first", "[email protected]");
+ ConnectRecord record = record(struct);
+ SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value();
+
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32);
+ transform.start(defaultKeyValue);
+ ConnectRecord actual = transform.doTransform(record);
+ assertNotNull(actual);
+ assertStruct((Struct) record.getData(), (Struct) actual.getData());
+ }
+
+ @Test
+ public void convert() {
+ final Schema inputSchema = SchemaBuilder.struct()
+ .field("first", Decimal.schema(5))
+ .field(
+ "second",
+ Decimal.builder(5)
+
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16")
+ .optional()
+ .build()
+ )
+ .field(
+ "third",
+ Decimal.builder(5)
+
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "48")
+ .optional()
+ .build()
+ )
+ .build();
+ final Struct inputStruct = new Struct(inputSchema)
+ .put("first", BigDecimal.ONE)
+ .put("second", null)
+ .put("third", BigDecimal.ONE);
+ final Schema expectedSchema = SchemaBuilder.struct()
+ .field(
+ "first",
+ Decimal.builder(5)
+
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32")
+ .build()
+ )
+ .field(
+ "second",
+ Decimal.builder(5)
+
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16")
+ .optional()
+ .build()
+ )
+ .field(
+ "third",
+ Decimal.builder(5)
+
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32")
+ .optional()
+ .build()
+ )
+ .build();
+ final Struct expectedStruct = new Struct(expectedSchema)
+ .put("first", BigDecimal.ONE)
+ .put("second", null)
+ .put("third", BigDecimal.ONE);
+
+
+ ConnectRecord record = record(inputStruct);
+ SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value();
+ KeyValue defaultKeyValue = new DefaultKeyValue();
+ defaultKeyValue.put(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32);
+
+ transform.start(defaultKeyValue);
+ ConnectRecord actual = transform.doTransform(record);
+ assertNotNull(actual);
+ assertStruct(expectedStruct, (Struct) actual.getData());
+ }
+
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
new file mode 100644
index 0000000..328ec1b
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.rocketmq.connect.transforms.SetNull;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class SetNullTest {
+
+ @Test
+ public void test() {
+ final ConnectRecord input = new ConnectRecord(
+ new RecordPartition(new HashMap<>()),
+ new RecordOffset(new HashMap<>()),
+ System.currentTimeMillis(),
+ SchemaUtil.STRING_SCHEMA,
+ "key",
+ null,
+ ""
+ );
+ SetNull transform = new SetNull.Key();
+ final ConnectRecord actual = transform.doTransform(input);
+ assertNull(actual.getKey(), "key should be null.");
+ assertNull(actual.getKeySchema(), "keySchema should be null.");
+ }
+
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
new file mode 100644
index 0000000..2951d62
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.junit.Before;
+
+public abstract class TransformationTest {
+ final boolean isKey;
+ final static String TOPIC = "test";
+
+ protected TransformationTest(boolean isKey) {
+ this.isKey = isKey;
+ }
+
+ protected abstract Transform<ConnectRecord> create();
+
+ Transform<ConnectRecord> transformation;
+
+ @Before
+ public void before() {
+ this.transformation = create();
+ }
+
+
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
new file mode 100644
index 0000000..06313ae
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class AssertSchema {
+ private AssertSchema() {
+ }
+
+ public static void assertSchema(Schema expected, Schema actual) {
+ assertSchema(expected, actual, (String)null);
+ }
+
+ public static void assertSchema(Schema expected, Schema actual, String
message) {
+ String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+ if (null == expected) {
+ Assertions.assertNull(actual, prefix + "actual should not be
null.");
+ } else {
+ Assertions.assertNotNull(expected, prefix + "expected schema
should not be null.");
+ Assertions.assertNotNull(actual, prefix + "actual schema should
not be null.");
+ Assertions.assertEquals(expected.getName(), actual.getName(),
prefix + "schema.name() should match.");
+ Assertions.assertEquals(expected.getFieldType(),
actual.getFieldType(), prefix + "schema.type() should match.");
+ Assertions.assertEquals(expected.getDefaultValue(),
actual.getDefaultValue(), prefix + "schema.defaultValue() should match.");
+ Assertions.assertEquals(expected.isOptional(),
actual.isOptional(), prefix + "schema.isOptional() should match.");
+ Assertions.assertEquals(expected.getDoc(), actual.getDoc(), prefix
+ "schema.doc() should match.");
+ Assertions.assertEquals(expected.getVersion(),
actual.getVersion(), prefix + "schema.version() should match.");
+ GenericAssertions.assertMap(expected.getParameters(),
actual.getParameters(), prefix + "schema.parameters() should match.");
+ if (null != expected.getDefaultValue()) {
+ Assertions.assertNotNull(actual.getDefaultValue(),
"actual.defaultValue() should not be null.");
+ Class<?> expectedType = null;
+ switch(expected.getFieldType()) {
+ case INT8:
+ expectedType = Byte.class;
+ break;
+ case INT16:
+ expectedType = Short.class;
+ break;
+ case INT32:
+ expectedType = Integer.class;
+ break;
+ case INT64:
+ expectedType = Long.class;
+ break;
+ case FLOAT32:
+ expectedType = Float.class;
+ break;
+ case FLOAT64:
+ expectedType = Float.class;
+ }
+
+ if (null != expectedType) {
+
Assertions.assertTrue(actual.getDefaultValue().getClass().isAssignableFrom(expectedType),
String.format("actual.defaultValue() should be a %s",
expectedType.getSimpleName()));
+ }
+ }
+
+ switch(expected.getFieldType()) {
+ case ARRAY:
+ assertSchema(expected.getValueSchema(),
actual.getValueSchema(), message + "valueSchema does not match.");
+ break;
+ case MAP:
+ assertSchema(expected.getKeySchema(), actual.getKeySchema(),
message + "keySchema does not match.");
+ assertSchema(expected.getValueSchema(),
actual.getValueSchema(), message + "valueSchema does not match.");
+ break;
+ case STRUCT:
+ List<Field> expectedFields = expected.getFields();
+ List<Field> actualFields = actual.getFields();
+ Assertions.assertEquals(expectedFields.size(),
actualFields.size(), prefix + "Number of fields do not match.");
+
+ for(int i = 0; i < expectedFields.size(); ++i) {
+ Field expectedField = (Field)expectedFields.get(i);
+ Field actualField = (Field)actualFields.get(i);
+ assertField(expectedField, actualField, "index " + i);
+ }
+ }
+
+ }
+ }
+
+ public static void assertField(Field expected, Field actual, String
message) {
+ String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+ if (null == expected) {
+ Assertions.assertNull(actual, prefix + "actual should be null.");
+ } else {
+ Assertions.assertEquals(expected.getName(), actual.getName(),
prefix + "name does not match");
+ Assertions.assertEquals(expected.getIndex(), actual.getIndex(),
prefix + "name does not match");
+ assertSchema(expected.getSchema(), actual.getSchema(), prefix +
"schema does not match");
+ }
+ }
+
+ public static void assertField(Field expected, Field actual) {
+ assertField(expected, actual, (String)null);
+ }
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
new file mode 100644
index 0000000..78c6498
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import com.google.common.io.BaseEncoding;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class AssertStruct {
+ private static final Logger log =
LoggerFactory.getLogger(AssertStruct.class);
+
+ public AssertStruct() {
+ }
+
+ static <T> T castAndVerify(Class<T> cls, Struct struct, Field field,
boolean expected) {
+ Object value = struct.get(field.getName());
+ String prefix = String.format("%s('%s') ", expected ? "expected" :
"actual", field.getName());
+ if (!field.getSchema().isOptional()) {
+ Assertions.assertNotNull(value, prefix + "has a require schema.
Should not be null.");
+ }
+
+ if (null == value) {
+ return null;
+ } else {
+ Assertions.assertTrue(cls.isInstance(value), String.format(prefix
+ "should be a %s", cls.getSimpleName()));
+ return cls.cast(value);
+ }
+ }
+
+ public static void assertStruct(Struct expected, Struct actual, String
message) {
+ String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+ if (null == expected) {
+ Assertions.assertNull(actual, prefix + "actual should be null.");
+ } else {
+ AssertSchema.assertSchema(expected.schema(), actual.schema(),
"schema does not match.");
+ Iterator var4 = expected.schema().getFields().iterator();
+
+ while(true) {
+ while(var4.hasNext()) {
+ Field expectedField = (Field)var4.next();
+ log.trace("assertStruct() - testing field '{}'",
expectedField.getName());
+ Object expectedValue =
expected.get(expectedField.getName());
+ Object actualValue = actual.get(expectedField.getName());
+ if
(Decimal.LOGICAL_NAME.equals(expectedField.getSchema().getName())) {
+ BigDecimal expectedDecimal =
(BigDecimal)castAndVerify(BigDecimal.class, expected, expectedField, true);
+ BigDecimal actualDecimal =
(BigDecimal)castAndVerify(BigDecimal.class, actual, expectedField, false);
+ Assertions.assertEquals(expectedDecimal,
actualDecimal, prefix + expectedField.getName() + " does not match.");
+ } else if
(!Timestamp.LOGICAL_NAME.equals(expectedField.getSchema().getName()) &&
!io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(expectedField.getSchema().getName())
&& !Time.LOGICAL_NAME.equals(expectedField.getSchema().getName())) {
+ switch(expectedField.getSchema().getFieldType()) {
+ case ARRAY:
+ List<Object> expectedArray =
(List)castAndVerify(List.class, expected, expectedField, true);
+ List<Object> actualArray =
(List)castAndVerify(List.class, actual, expectedField, false);
+ Assertions.assertEquals(expectedArray,
actualArray, prefix + expectedField.getName() + " does not match.");
+ break;
+ case MAP:
+ Map<Object, Object> expectedMap =
(Map)castAndVerify(Map.class, expected, expectedField, true);
+ Map<Object, Object> actualMap =
(Map)castAndVerify(Map.class, actual, expectedField, false);
+ Assertions.assertEquals(expectedMap, actualMap,
prefix + expectedField.getName() + " does not match.");
+ break;
+ case STRUCT:
+ Struct expectedStruct =
(Struct)castAndVerify(Struct.class, expected, expectedField, true);
+ Struct actualStruct =
(Struct)castAndVerify(Struct.class, actual, expectedField, false);
+ assertStruct(expectedStruct, actualStruct, prefix
+ expectedField.getName() + " does not match.");
+ break;
+ case BYTES:
+ byte[] expectedByteArray =
(byte[])castAndVerify(byte[].class, expected, expectedField, true);
+ byte[] actualByteArray =
(byte[])castAndVerify(byte[].class, actual, expectedField, false);
+ Assertions.assertEquals(null == expectedByteArray
? "" : BaseEncoding.base32Hex().encode(expectedByteArray).toString(), null ==
actualByteArray ? "" :
BaseEncoding.base32Hex().encode(actualByteArray).toString(), prefix +
expectedField.getName() + " does not match.");
+ break;
+ default:
+ Assertions.assertEquals(expectedValue,
actualValue, prefix + expectedField.getName() + " does not match.");
+ }
+ } else {
+ Date expectedDate = (Date)castAndVerify(Date.class,
expected, expectedField, true);
+ Date actualDate = (Date)castAndVerify(Date.class,
actual, expectedField, false);
+ Assertions.assertEquals(expectedDate, actualDate,
prefix + expectedField.getName() + " does not match.");
+ }
+ }
+
+ return;
+ }
+ }
+ }
+
+ public static void assertStruct(Struct expected, Struct actual) {
+ assertStruct(expected, actual, (String)null);
+ }
+}
diff --git
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
new file mode 100644
index 0000000..f14f605
--- /dev/null
+++
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericAssertions {
+ private GenericAssertions() {
+
+ }
+
+ static class MapDifferenceSupplier implements Supplier<String> {
+ final MapDifference<?, ?> mapDifference;
+ final String method;
+
+ public MapDifferenceSupplier(MapDifference<?, ?> mapDifference, String
method) {
+ this.mapDifference = mapDifference;
+ this.method = method;
+ }
+
+ @Override
+ public String get() {
+ try (Writer w = new StringWriter()) {
+ try (BufferedWriter writer = new BufferedWriter(w)) {
+ writer.append(String.format("Map for actual.%s() does not match
expected.%s().", this.method, this.method));
+ writer.newLine();
+ Map<?, ? extends MapDifference.ValueDifference<?>> differences =
mapDifference.entriesDiffering();
+ if (!differences.isEmpty()) {
+ writer.append("Keys with Differences");
+ writer.newLine();
+ for (Map.Entry<?, ? extends MapDifference.ValueDifference<?>> kvp
: differences.entrySet()) {
+ writer.append(" ");
+ writer.append(kvp.getKey().toString());
+ writer.newLine();
+
+ writer.append(" expected:");
+ writer.append(kvp.getValue().leftValue().toString());
+ writer.newLine();
+
+ writer.append(" actual:");
+ writer.append(kvp.getValue().rightValue().toString());
+ writer.newLine();
+ }
+ }
+
+ Map<?, ?> entries = mapDifference.entriesOnlyOnLeft();
+ writeEntries(writer, "Only in expected map", entries);
+
+ Map<?, ?> onlyInActual = mapDifference.entriesOnlyOnRight();
+ writeEntries(writer, "Only in actual map", onlyInActual);
+ }
+ return w.toString();
+ } catch (IOException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ private void writeEntries(BufferedWriter writer, String header, Map<?, ?>
entries) throws IOException {
+ if (!entries.isEmpty()) {
+ writer.append(header);
+ writer.newLine();
+
+ for (Map.Entry<?, ?> kvp : entries.entrySet()) {
+ writer.append(" ");
+ writer.append(kvp.getKey().toString());
+ writer.append(": ");
+ writer.append(kvp.getValue().toString());
+ writer.newLine();
+ }
+ writer.newLine();
+ }
+ }
+ }
+
+ static void assertMap(Map<String, ?> expected, Map<String, ?> actual, String
message) {
+ if (null == expected && null == actual) {
+ return;
+ }
+
+ String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+ assertNotNull(expected, prefix + "expected cannot be null");
+ assertNotNull(actual, prefix + "actual cannot be null");
+ MapDifference<String, ?> mapDifference = Maps.difference(expected, actual);
+ assertTrue(mapDifference.areEqual(), new
MapDifferenceSupplier(mapDifference, prefix));
+ }
+}