This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 8945529  [ISSUE #223]Add common transform  (#224)
8945529 is described below

commit 8945529e8dad98f8a781c1bd830fad8405d48243
Author: xiaoyi <[email protected]>
AuthorDate: Wed Aug 3 20:34:56 2022 +0800

    [ISSUE #223]Add common transform  (#224)
    
    * init transforms
    
    * upgrade api to 0.1.3
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
    
    * Encountered change event for table databasename.tablename whose schema 
isn`t known to this connector #191
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * remove local config
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * Rocketmq replicator running null pointer #205
    
    * add common transform #223
    
    * fixed
    
    * move transform to root directory #223
    
    Co-authored-by: “sunxiaojian” <“[email protected]”>
---
 pom.xml                                            |   1 +
 transforms/pom.xml                                 | 183 ++++++++++++++++++++
 .../connect/transforms/BaseTransformation.java     | 180 ++++++++++++++++++++
 .../rocketmq/connect/transforms/ChangeCase.java    | 129 ++++++++++++++
 .../connect/transforms/ChangeCaseConfig.java       |  54 ++++++
 .../connect/transforms/ExtractNestedField.java     | 122 ++++++++++++++
 .../transforms/ExtractNestedFieldConfig.java       |  58 +++++++
 .../rocketmq/connect/transforms/PatternFilter.java | 132 +++++++++++++++
 .../connect/transforms/PatternFilterConfig.java    |  63 +++++++
 .../rocketmq/connect/transforms/PatternRename.java | 146 ++++++++++++++++
 .../connect/transforms/PatternRenameConfig.java    |  82 +++++++++
 .../rocketmq/connect/transforms/RegexRouter.java   |  73 ++++++++
 .../connect/transforms/SetMaximumPrecision.java    | 186 +++++++++++++++++++++
 .../transforms/SetMaximumPrecisionConfig.java      |  37 ++++
 .../rocketmq/connect/transforms/SetNull.java       |  72 ++++++++
 .../connect/transforms/util/ExtendKeyValue.java    | 121 ++++++++++++++
 .../connect/transforms/util/SchemaHelper.java      |  74 ++++++++
 .../connect/transforms/util/SchemaUtil.java        |  29 ++++
 .../connect/transforms/test/ChangeCaseTest.java    |  85 ++++++++++
 .../transforms/test/ExtractNestedFieldTest.java    | 111 ++++++++++++
 .../connect/transforms/test/PatternFilterTest.java |  93 +++++++++++
 .../connect/transforms/test/PatternRenameTest.java | 150 +++++++++++++++++
 .../transforms/test/SetMaximumPrecisionTest.java   | 136 +++++++++++++++
 .../connect/transforms/test/SetNullTest.java       |  49 ++++++
 .../transforms/test/TransformationTest.java        |  41 +++++
 .../transforms/test/common/AssertSchema.java       | 113 +++++++++++++
 .../transforms/test/common/AssertStruct.java       | 115 +++++++++++++
 .../transforms/test/common/GenericAssertions.java  | 112 +++++++++++++
 28 files changed, 2747 insertions(+)

diff --git a/pom.xml b/pom.xml
index c8983d3..26d5f2c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,6 +27,7 @@
         <module>rocketmq-connect-runtime</module>
         <module>rocketmq-connect-cli</module>
         <module>distribution</module>
+        <module>transforms</module>
     </modules>
     <name>RocketMQ Connect</name>
 
diff --git a/transforms/pom.xml b/transforms/pom.xml
new file mode 100644
index 0000000..e511f0f
--- /dev/null
+++ b/transforms/pom.xml
@@ -0,0 +1,183 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>rocketmq-connect</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-transforms</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.36</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>31.1-jre</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>5.6.3</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    
<excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            
<configLocation>../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
new file mode 100644
index 0000000..6b1ff8b
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import org.apache.rocketmq.connect.transforms.util.SchemaHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTransformation<R extends ConnectRecord> implements 
Transform<R> {
+    private static final Logger log = 
LoggerFactory.getLogger(BaseTransformation.class);
+
+    protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+        throw new UnsupportedOperationException("MAP is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processStruct(R record, Schema inputSchema, 
Struct input) {
+        throw new UnsupportedOperationException("STRUCT is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processString(R record, Schema inputSchema, 
String input) {
+        throw new UnsupportedOperationException("STRING is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] 
input) {
+        throw new UnsupportedOperationException("BYTES is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processInt8(R record, Schema inputSchema, byte 
input) {
+        throw new UnsupportedOperationException("INT8 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processInt16(R record, Schema inputSchema, short 
input) {
+        throw new UnsupportedOperationException("INT16 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processInt32(R record, Schema inputSchema, int 
input) {
+        throw new UnsupportedOperationException("INT32 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processInt64(R record, Schema inputSchema, long 
input) {
+        throw new UnsupportedOperationException("INT64 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processBoolean(R record, Schema inputSchema, 
boolean input) {
+        throw new UnsupportedOperationException("BOOLEAN is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processTimestamp(R record, Schema inputSchema, 
Date input) {
+        throw new UnsupportedOperationException("Timestamp is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processDate(R record, Schema inputSchema, Date 
input) {
+        throw new UnsupportedOperationException("Date is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processTime(R record, Schema inputSchema, Date 
input) {
+        throw new UnsupportedOperationException("Time is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processDecimal(R record, Schema inputSchema, 
BigDecimal input) {
+        throw new UnsupportedOperationException("Decimal is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processFloat64(R record, Schema inputSchema, 
double input) {
+        throw new UnsupportedOperationException("FLOAT64 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processFloat32(R record, Schema inputSchema, 
float input) {
+        throw new UnsupportedOperationException("FLOAT32 is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processArray(R record, Schema inputSchema, 
List<Object> input) {
+        throw new UnsupportedOperationException("ARRAY is not a supported 
type.");
+    }
+
+    protected SchemaAndValue processMap(R record, Schema inputSchema, 
Map<Object, Object> input) {
+        throw new UnsupportedOperationException("MAP is not a supported 
type.");
+    }
+
+
+    private static final Schema OPTIONAL_TIMESTAMP = 
Timestamp.builder().optional().build();
+    protected SchemaAndValue process(R record, Schema inputSchema, Object 
input) {
+        final SchemaAndValue result;
+        if (null == inputSchema && null == input) {
+            return new SchemaAndValue(
+                    null,
+                    null
+            );
+        }
+        if (input instanceof Map) {
+            log.trace("process() - Processing as map");
+            result = processMap(record, (Map<String, Object>) input);
+            return result;
+        }
+
+        if (null == inputSchema) {
+            log.trace("process() - Determining schema");
+            inputSchema = SchemaHelper.schema(input);
+        }
+
+        String schemaName = inputSchema.getName();
+        FieldType schemaType =  inputSchema.getFieldType();
+        log.trace("process() - Input has as schema. schema = {}", inputSchema);
+        if (FieldType.STRUCT == schemaType) {
+            result = processStruct(record, inputSchema, (Struct) input);
+        } else if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
+            result = processTimestamp(record, inputSchema, (Date) input);
+        } else if 
(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(schemaName))
 {
+            result = processDate(record, inputSchema, (Date) input);
+        } else if (Time.LOGICAL_NAME.equals(schemaName)) {
+            result = processTime(record, inputSchema, (Date) input);
+        } else if (Decimal.LOGICAL_NAME.equals(schemaName)) {
+            result = processDecimal(record, inputSchema, (BigDecimal) input);
+        } else if (FieldType.STRING == schemaType) {
+            result = processString(record, inputSchema, (String) input);
+        } else if (FieldType.BYTES == schemaType) {
+            result = processBytes(record, inputSchema, (byte[]) input);
+        } else if (FieldType.INT8 == schemaType) {
+            result = processInt8(record, inputSchema, (byte) input);
+        } else if (FieldType.INT16 == schemaType) {
+            result = processInt16(record, inputSchema, (short) input);
+        } else if (FieldType.INT32 == schemaType) {
+            result = processInt32(record, inputSchema, (int) input);
+        } else if (FieldType.INT64 == schemaType) {
+            result = processInt64(record, inputSchema, (long) input);
+        } else if (FieldType.FLOAT32 == schemaType) {
+            result = processFloat32(record, inputSchema, (float) input);
+        } else if (FieldType.FLOAT64 == schemaType) {
+            result = processFloat64(record, inputSchema, (double) input);
+        } else if (FieldType.ARRAY == schemaType) {
+            result = processArray(record, inputSchema, (List<Object>) input);
+        } else if (FieldType.MAP == schemaType) {
+            result = processMap(record, inputSchema, (Map<Object, Object>) 
input);
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Schema is not supported. type='%s' name='%s'",
+                            schemaType,
+                            schemaName
+                    )
+            );
+        }
+
+        return result;
+    }
+
+    @Override
+    public void stop() {
+        // NO-op
+    }
+}
\ No newline at end of file
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
new file mode 100644
index 0000000..b8094d2
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * change case
+ * @param <R>
+ */
+public abstract class ChangeCase<R extends ConnectRecord> extends 
BaseTransformation<R> {
+  private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
+
+  class State {
+    public final Map<String, String> columnMapping;
+    public final Schema schema;
+    State(Map<String, String> columnMapping, Schema schema) {
+      this.columnMapping = columnMapping;
+      this.schema = schema;
+    }
+  }
+
+  Map<Schema, State> schemaState = new HashMap<>();
+  private ChangeCaseConfig config;
+  @Override
+  public void start(KeyValue config) {
+    this.config = new ChangeCaseConfig(config);
+  }
+
+  @Override
+  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
+    // state
+    final State state = this.schemaState.computeIfAbsent(inputSchema, schema 
-> {
+      final SchemaBuilder builder = SchemaBuilder.struct();
+      if (!Strings.isNullOrEmpty(schema.getName())) {
+        builder.name(schema.getName());
+      }
+      if (schema.isOptional()) {
+        builder.optional();
+      }
+      final Map<String, String> columnMapping = new LinkedHashMap<>();
+      for (Field field : schema.getFields()) {
+        final String newFieldName = this.config.from.to(this.config.to, 
field.getName());
+        log.trace("processStruct() - Mapped '{}' to '{}'", field.getName(), 
newFieldName);
+        columnMapping.put(field.getName(), newFieldName);
+        builder.field(newFieldName, field.getSchema());
+      }
+      return new State(columnMapping, builder.build());
+    });
+
+
+    final Struct outputStruct = new Struct(state.schema);
+    for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
+      final Object value = input.get(kvp.getKey());
+      outputStruct.put(kvp.getValue(), value);
+    }
+    return new SchemaAndValue(state.schema, outputStruct);
+  }
+
+  /**
+   * transform key
+   */
+  public static class Key extends ChangeCase<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              transformed.schema(),
+              transformed.value(),
+              r.getSchema(),
+              r.getData()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+
+
+  /**
+   * transform value
+   */
+  public static class Value extends ChangeCase<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              r.getKeySchema(),
+              r.getKey(),
+              transformed.schema(),
+              transformed.value()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
new file mode 100644
index 0000000..fa9a5c9
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+
+import com.google.common.base.CaseFormat;
+import io.openmessaging.KeyValue;
+
+public class ChangeCaseConfig {
+  public final CaseFormat from;
+  public final CaseFormat to;
+
+  public static final String FROM_CONFIG = "from";
+  static final String FROM_DOC = "The format to move from ";
+  public static final String TO_CONFIG = "to";
+  static final String TO_DOC = "";
+
+  public ChangeCaseConfig(KeyValue config) {
+    String fromConfig = config.getString(FROM_CONFIG);
+    this.from = CaseFormat.valueOf(fromConfig);
+    String toConfig = config.getString(TO_CONFIG);
+    this.to = CaseFormat.valueOf(toConfig);
+  }
+
+  /**
+   * from
+   * @return
+   */
+  public CaseFormat from(){
+    return this.from;
+  }
+
+  /**
+   * to
+   * @return
+   */
+  public CaseFormat to(){
+    return this.to;
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
new file mode 100644
index 0000000..122951d
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * extract nested field
+ * @param <R>
+ */
+public abstract class ExtractNestedField<R extends ConnectRecord> extends 
BaseTransformation<R> {
+  private static final Logger log = 
LoggerFactory.getLogger(ExtractNestedField.class);
+  private ExtractNestedFieldConfig config;
+  Map<Schema, Schema> schemaCache;
+  @Override
+  public void start(KeyValue keyValue) {
+    this.config = new ExtractNestedFieldConfig(keyValue);
+    this.schemaCache = new HashMap<>();
+  }
+
+
+  @Override
+  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
+    final Struct innerStruct = input.getStruct(this.config.outerFieldName);
+    final Schema outputSchema = this.schemaCache.computeIfAbsent(inputSchema, 
s -> {
+
+      final Field innerField = 
innerStruct.schema().getField(this.config.innerFieldName);
+      final SchemaBuilder builder = SchemaBuilder.struct();
+      if (!Strings.isNullOrEmpty(inputSchema.getName())) {
+        builder.name(inputSchema.getName());
+      }
+      if (inputSchema.isOptional()) {
+        builder.optional();
+      }
+      for (Field inputField : inputSchema.getFields()) {
+        builder.field(inputField.getName(), inputField.getSchema());
+      }
+      builder.field(this.config.innerFieldName, innerField.getSchema());
+      return builder.build();
+    });
+    final Struct outputStruct = new Struct(outputSchema);
+    for (Field inputField : inputSchema.getFields()) {
+      final Object value = input.get(inputField);
+      outputStruct.put(inputField.getName(), value);
+    }
+    final Object innerFieldValue = innerStruct.get(this.config.innerFieldName);
+    outputStruct.put(this.config.innerFieldName, innerFieldValue);
+
+    return new SchemaAndValue(outputSchema, outputStruct);
+
+  }
+
+  /**
+   * transform key
+   */
+  public static class Key extends ExtractNestedField<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              transformed.schema(),
+              transformed.value(),
+              r.getSchema(),
+              r.getData()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+
+
+  /**
+   * transform value
+   */
+  public static class Value extends ExtractNestedField<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              r.getKeySchema(),
+              r.getKey(),
+              transformed.schema(),
+              transformed.value()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
new file mode 100644
index 0000000..9cd2f23
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+
+
+/**
+ * extract nested field config
+ */
+public class ExtractNestedFieldConfig{
+  public final String outerFieldName;
+  public final String innerFieldName;
+  public final String outputFieldName;
+
+  public ExtractNestedFieldConfig(KeyValue config) {
+    this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
+    this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
+    this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
+  }
+
+  public static final String OUTER_FIELD_NAME_CONF = "input.outer.field.name";
+  static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct 
containing the child struct. " +
+      "For example if you wanted the extract `address.state` you would use 
`address`.";
+  public static final String INNER_FIELD_NAME_CONF = "input.inner.field.name";
+  static final String INNER_FIELD_NAME_DOC = "The field on the child struct 
containing the field to be extracted. " +
+      "For example if you wanted the extract `address.state` you would use 
`state`.";
+  public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
+  static final String OUTPUT_FIELD_NAME_DOC = "The field to place the 
extracted value into.";
+
+
+  public String outerFieldName(){
+    return this.outerFieldName;
+  }
+
+  public String innerFieldName(){
+    return this.innerFieldName;
+  }
+
+  public String outputFieldName(){
+    return this.outerFieldName;
+  }
+
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
new file mode 100644
index 0000000..e2a71ff
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * pattern filter
+ * @param <R>
+ */
+public abstract class PatternFilter<R extends ConnectRecord> extends 
BaseTransformation<R> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(PatternFilter.class);
+  public Pattern pattern;
+  public Set<String> fields;
+
+  private PatternFilterConfig config;
+  @Override
+  public void start(KeyValue config) {
+    this.config = new PatternFilterConfig(config);
+    this.pattern = this.config.pattern();
+    this.fields =  this.config.fields();
+  }
+
+
+  R filter(R record, Struct struct) {
+    for (Field field : struct.schema().getFields()) {
+      if (!this.fields.contains(field.getName()) || 
field.getSchema().getFieldType() != FieldType.STRING) {
+        continue;
+      }
+      String input = struct.getString(field.getName());
+      if (null != input) {
+        if (this.pattern.matcher(input).matches()) {
+          return null;
+        }
+      }
+    }
+    return record;
+  }
+
+  /**
+   * filter map
+   * @param record
+   * @param map
+   * @return
+   */
+  R filter(R record, Map map) {
+    for (Object field : map.keySet()) {
+      if (!this.fields.contains(field)) {
+        continue;
+      }
+      Object value = map.get(field);
+      if (value instanceof String) {
+        String input = (String) value;
+        if (this.pattern.matcher(input).matches()) {
+          return null;
+        }
+      }
+    }
+    return record;
+  }
+
+
+  R filter(R record, final boolean key) {
+    final SchemaAndValue input = key ?
+        new SchemaAndValue(record.getKeySchema(), record.getKey()) :
+        new SchemaAndValue(record.getSchema(), record.getData());
+    final R result;
+    if (input.schema() != null) {
+      if (FieldType.STRUCT == input.schema().getFieldType()) {
+        result = filter(record, (Struct) input.value());
+      } else if (FieldType.MAP == input.schema().getFieldType()) {
+        result = filter(record, (Map) input.value());
+      } else {
+        result = record;
+      }
+    } else if (input.value() instanceof Map) {
+      result = filter(record, (Map) input.value());
+    } else {
+      result = record;
+    }
+    return result;
+  }
+
+
+  /**
+   * filter key
+   * @param <R>
+   */
+  public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
+    @Override
+    public R doTransform(R r) {
+      return filter(r, true);
+    }
+  }
+
+  /**
+   * filter value
+   * @param <R>
+   */
+  public static class Value<R extends ConnectRecord> extends PatternFilter<R> {
+    @Override
+    public R doTransform(R r) {
+      return filter(r, false);
+    }
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
new file mode 100644
index 0000000..543ddf7
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.transforms.util.ExtendKeyValue;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+
+/**
+ * pattern filter config
+ */
+public class PatternFilterConfig{
+
+  public static final String PATTERN_CONFIG = "pattern";
+  public static final String PATTERN_DOC = "The regex to test the message 
with. ";
+
+  public static final String FIELD_CONFIG = "fields";
+  public static final String FIELD_DOC = "The fields to transform.";
+
+
+  private final Pattern pattern;
+  private final Set<String> fields;
+  public PatternFilterConfig(KeyValue config){
+    ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
+    String pattern = extendKeyValue.getString(PATTERN_CONFIG);
+    try {
+      this.pattern = Pattern.compile(pattern);
+    } catch (PatternSyntaxException var4) {
+      throw new ConnectException(String.format("Could not compile regex 
'%s'.", pattern));
+    }
+    List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
+    this.fields = new HashSet<>(fields);
+  }
+
+  public Pattern pattern() {
+    return this.pattern;
+  }
+
+  public Set<String> fields() {
+    return this.fields;
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
new file mode 100644
index 0000000..d27e330
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+/**
+ * pattern rename
+ * @param <R>
+ */
+public abstract class PatternRename<R extends ConnectRecord> extends 
BaseTransformation<R> {
+  private static final Logger log = 
LoggerFactory.getLogger(PatternRename.class);
+  PatternRenameConfig config;
+  @Override
+  public void start(KeyValue keyValue) {
+    config = new PatternRenameConfig(keyValue);
+  }
+
+  @Override
+  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
inputStruct) {
+    final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
+    outputSchemaBuilder.name(inputSchema.getName());
+    outputSchemaBuilder.doc(inputSchema.getDoc());
+    if (null != inputSchema.getDefaultValue()) {
+      outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
+    }
+    if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
+      outputSchemaBuilder.parameters(inputSchema.getParameters());
+    }
+    if (inputSchema.isOptional()) {
+      outputSchemaBuilder.optional();
+    }
+    Map<String, String> fieldMappings = new 
HashMap<>(inputSchema.getFields().size());
+    for (final Field inputField : inputSchema.getFields()) {
+      log.trace("process() - Processing field '{}'", inputField.getName());
+      final Matcher fieldMatcher = 
this.config.pattern.matcher(inputField.getName());
+      final String outputFieldName;
+      if (fieldMatcher.find()) {
+        // replace
+        outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
+      } else {
+        outputFieldName = inputField.getName();
+      }
+      log.trace("process() - Mapping field '{}' to '{}'", 
inputField.getName(), outputFieldName);
+      fieldMappings.put(inputField.getName(), outputFieldName);
+      outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
+    }
+    final Schema outputSchema = outputSchemaBuilder.build();
+    final Struct outputStruct = new Struct(outputSchema);
+    for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
+      final String inputField = entry.getKey(), outputField = entry.getValue();
+      log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
+      final Object value = inputStruct.get(inputField);
+      outputStruct.put(outputField, value);
+    }
+    return new SchemaAndValue(outputSchema, outputStruct);
+  }
+
+  @Override
+  protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+    final Map<String, Object> outputMap = new LinkedHashMap<>(input.size());
+    for (final String inputFieldName : input.keySet()) {
+      log.trace("process() - Processing field '{}'", inputFieldName);
+      final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName);
+      final String outputFieldName;
+      // replace
+      if (fieldMatcher.find()) {
+        outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
+      } else {
+        outputFieldName = inputFieldName;
+      }
+      final Object value = input.get(inputFieldName);
+      outputMap.put(outputFieldName, value);
+    }
+    return new SchemaAndValue(null, outputMap);
+  }
+
+  /**
+   * transform key
+   */
+  public static class Key extends PatternRename<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              transformed.schema(),
+              transformed.value(),
+              r.getSchema(),
+              r.getData()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+
+  /**
+   * transform value
+   */
+  public static class Value extends PatternRename<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              r.getKeySchema(),
+              r.getKey(),
+              transformed.schema(),
+              transformed.value()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
new file mode 100644
index 0000000..b7c0e9e
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.transforms.util.ExtendKeyValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * pattern rename config
+ */
+public class PatternRenameConfig {
+
+  public static final String FIELD_PATTERN_CONF = "field.pattern";
+  static final String FIELD_PATTERN_DOC = "";
+
+  public static final String FIELD_PATTERN_FLAGS_CONF = "field.pattern.flags";
+  static final String FIELD_PATTERN_FLAGS_DOC = "";
+
+  public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
+  static final String FIELD_REPLACEMENT_DOC = "";
+
+  static final Map<String, Integer> FLAG_VALUES;
+
+  static {
+    Map<String, Integer> map = new HashMap<>();
+    map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
+    map.put("CANON_EQ", Pattern.CANON_EQ);
+    map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
+    map.put("DOTALL", Pattern.DOTALL);
+    map.put("LITERAL", Pattern.LITERAL);
+    map.put("MULTILINE", Pattern.MULTILINE);
+    map.put("COMMENTS", Pattern.COMMENTS);
+    map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
+    map.put("UNIX_LINES", Pattern.UNIX_LINES);
+    FLAG_VALUES = ImmutableMap.copyOf(map);
+  }
+
+  public final Pattern pattern;
+  public final String replacement;
+
+  public PatternRenameConfig(KeyValue config) {
+    ExtendKeyValue extendConfig = new ExtendKeyValue(config);
+    final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
+    final List<String> flagList = 
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
+    int patternFlags = 0;
+    for (final String f : flagList) {
+      final int flag = FLAG_VALUES.get(f);
+      patternFlags = patternFlags | flag;
+    }
+    this.pattern = Pattern.compile(pattern, patternFlags);
+    this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
+  }
+
+
+  public Pattern pattern() {
+    return pattern;
+  }
+
+  public String replacement() {
+    return replacement;
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
new file mode 100644
index 0000000..ff4b94f
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * regex router
+ * @param <R>
+ */
+public abstract class RegexRouter<R extends ConnectRecord> extends 
BaseTransformation<R> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RegexRouter.class);
+    public static final String TOPIC = "topic";
+    public static final String OVERVIEW_DOC = "Update the record topic using 
the configured regular expression and replacement string."
+            + "<p/>Under the hood, the regex is compiled to a 
<code>java.util.regex.Pattern</code>. "
+            + "If the pattern matches the input topic, 
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the 
replacement string to obtain the new topic.";
+
+    private interface ConfigName {
+        String REGEX = "regex";
+        String REPLACEMENT = "replacement";
+    }
+
+    private Pattern regex;
+    private String replacement;
+
+    @Override
+    public void start(KeyValue config) {
+        regex = Pattern.compile(config.getString(ConfigName.REGEX));
+        replacement = config.getString(ConfigName.REPLACEMENT);
+    }
+
+    @Override
+    public R doTransform(R record) {
+        Map<String, Object>  partitionMap = (Map<String, 
Object>)record.getPosition().getPartition().getPartition();
+        if (null == partitionMap || !partitionMap.containsKey(TOPIC)) {
+            LOG.warn("PartitionMap get topic is null , lack of topic config");
+            return record;
+        }
+        Object o = partitionMap.get(TOPIC);
+        if (null == o) {
+            LOG.warn("PartitionMap get topic is null , lack of topic config");
+            return record;
+
+        }
+        final Matcher matcher = regex.matcher(o.toString());
+        if (matcher.matches()) {
+            String topic = matcher.replaceFirst(replacement);
+            partitionMap.put(TOPIC, topic);
+        }
+        return record;
+    }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
new file mode 100644
index 0000000..3ba40c6
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * set maximum precision
+ * @param <R>
+ */
+public abstract class SetMaximumPrecision<R extends ConnectRecord> extends 
BaseTransformation<R> {
+  private static final Logger log = 
LoggerFactory.getLogger(SetMaximumPrecision.class);
+
+  SetMaximumPrecisionConfig config;
+
+  @Override
+  public void start(KeyValue keyValue) {
+    config = new SetMaximumPrecisionConfig(keyValue);
+  }
+
+  static final State NOOP = new State(true, null, null);
+
+  static class State {
+    public final boolean noop;
+    public final Schema outputSchema;
+    public final Set<String> decimalFields;
+
+    State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
+      this.noop = noop;
+      this.outputSchema = outputSchema;
+      this.decimalFields = decimalFields;
+    }
+  }
+
+  Map<Schema, State> schemaLookup = new HashMap<>();
+
+  public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = 
"connect.decimal.precision";
+
+  State state(Schema inputSchema) {
+    return this.schemaLookup.computeIfAbsent(inputSchema, new Function<Schema, 
State>() {
+      @Override
+      public State apply(Schema schema) {
+        Set<String> decimalFields = inputSchema.getFields().stream()
+            .filter(f -> Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
+            .filter(f -> 
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
 "64")) > config.maxPrecision())
+            .map(Field::getName)
+            .collect(Collectors.toSet());
+        State result;
+
+        if (decimalFields.size() == 0) {
+          result = NOOP;
+        } else {
+          log.trace("state() - processing schema '{}'", schema.getName());
+          SchemaBuilder builder = SchemaBuilder.struct()
+              .name(inputSchema.getName())
+              .doc(inputSchema.getDoc())
+              .version(inputSchema.getVersion());
+          if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
+            builder.parameters(inputSchema.getParameters());
+          }
+
+          for (Field field : inputSchema.getFields()) {
+            log.trace("state() - processing field '{}'", field.getName());
+            if (decimalFields.contains(field.getName())) {
+              Map<String, String> parameters = new LinkedHashMap<>();
+              if (null != field.getSchema().getParameters() && 
!field.getSchema().getParameters().isEmpty()) {
+                parameters.putAll(field.getSchema().getParameters());
+              }
+              parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP, 
Integer.toString(config.maxPrecision()));
+              int scale = 
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
+              SchemaBuilder fieldBuilder = Decimal.builder(scale)
+                  .parameters(parameters)
+                  .doc(field.getSchema().getDoc())
+                  .version(field.getSchema().getVersion());
+              if (field.getSchema().isOptional()) {
+                fieldBuilder.optional();
+              }
+              Schema fieldSchema = fieldBuilder.build();
+              builder.field(field.getName(), fieldSchema);
+            } else {
+              log.trace("state() - copying field '{}' to new schema.", 
field.getName());
+              builder.field(field.getName(), field.getSchema());
+            }
+          }
+
+          Schema outputSchema = builder.build();
+          result = new State(false, outputSchema, decimalFields);
+        }
+
+        return result;
+      }
+    });
+
+  }
+
+  @Override
+  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
+    State state = state(inputSchema);
+    SchemaAndValue result;
+
+    if (state.noop) {
+      result = new SchemaAndValue(inputSchema, input);
+    } else {
+      Struct struct = new Struct(state.outputSchema);
+      for (Field field : inputSchema.getFields()) {
+        struct.put(field.getName(), input.get(field.getName()));
+      }
+      result = new SchemaAndValue(state.outputSchema, struct);
+    }
+    return result;
+  }
+
+  /**
+   * transform key
+   */
+  public static class Key extends SetMaximumPrecision<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      SchemaAndValue transformed = this.process(r, r.getKeySchema(), 
r.getKey());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              transformed.schema(),
+              transformed.value(),
+              r.getSchema(),
+              r.getData()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+
+  /**
+   * transform value
+   */
+  public static class Value extends SetMaximumPrecision<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      SchemaAndValue transformed = this.process(r, r.getSchema(), r.getData());
+      ConnectRecord record = new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              r.getKeySchema(),
+              r.getKey(),
+              transformed.schema(),
+              transformed.value()
+      );
+      record.setExtensions(r.getExtensions());
+      return record;
+    }
+  }
+}
+
+
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
new file mode 100644
index 0000000..9a801c3
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * set maximum precision config
+ */
+public class SetMaximumPrecisionConfig {
+  public static final String MAX_PRECISION_CONFIG = "precision.max";
+  static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
+
+  private final int maxPrecision;
+
+  public SetMaximumPrecisionConfig(KeyValue config) {
+    this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
+  }
+
+  public int maxPrecision(){
+    return maxPrecision;
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
new file mode 100644
index 0000000..54ed09d
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * set null
+ * @param <R>
+ */
+public abstract class SetNull<R extends ConnectRecord> extends 
BaseTransformation<R> {
+  private static final Logger log = LoggerFactory.getLogger(SetNull.class);
+
+
+  PatternRenameConfig config;
+
+  @Override
+  public void start(KeyValue keyValue) {
+    config = new PatternRenameConfig(keyValue);
+  }
+
+  /**
+   * transform key
+   */
+  public static class Key extends SetNull<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      return new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              null,
+              null,
+              r.getSchema(),
+              r.getData()
+      );
+    }
+  }
+
+
+  public static class Value extends SetNull<ConnectRecord> {
+    @Override
+    public ConnectRecord doTransform(ConnectRecord r) {
+      return new ConnectRecord(
+              r.getPosition().getPartition(),
+              r.getPosition().getOffset(),
+              r.getTimestamp(),
+              r.getKeySchema(),
+              r.getKey(),
+              r.getSchema(),
+              r.getData()
+      );
+    }
+  }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
new file mode 100644
index 0000000..9297718
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
@@ -0,0 +1,121 @@
+package org.apache.rocketmq.connect.transforms.util;
+
+import io.openmessaging.KeyValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * extend key value
+ */
+public class ExtendKeyValue implements KeyValue {
+    private static final Pattern COMMA_WITH_WHITESPACE = 
Pattern.compile("\\s*,\\s*");
+
+
+    private KeyValue config;
+    public ExtendKeyValue(KeyValue config){
+        this.config = config;
+    }
+
+    @Override
+    public KeyValue put(String s, int i) {
+        return config.put(s, i);
+    }
+
+    @Override
+    public KeyValue put(String s, long l) {
+        return config.put(s, l);
+    }
+
+    @Override
+    public KeyValue put(String s, double v) {
+        return config.put(s, v);
+    }
+
+    @Override
+    public KeyValue put(String s, String s1) {
+        return config.put(s, s1);
+    }
+
+    @Override
+    public int getInt(String s) {
+        return config.getInt(s);
+    }
+
+    @Override
+    public int getInt(String s, int i) {
+        return config.getInt(s, i);
+    }
+
+    @Override
+    public long getLong(String s) {
+        return config.getLong(s);
+    }
+
+    @Override
+    public long getLong(String s, long l) {
+        return config.getLong(s, l);
+    }
+
+    @Override
+    public double getDouble(String s) {
+        return config.getDouble(s);
+    }
+
+    @Override
+    public double getDouble(String s, double v) {
+        return config.getDouble(s, v);
+    }
+
+    @Override
+    public String getString(String s) {
+        return config.getString(s);
+    }
+
+    @Override
+    public String getString(String s, String s1) {
+        return config.getString(s, s1);
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return config.keySet();
+    }
+
+    @Override
+    public boolean containsKey(String s) {
+        return config.containsKey(s);
+    }
+
+    /**
+     * get list
+     * @param s
+     * @return
+     */
+    public List getList(String s){
+        if (!this.config.containsKey(s)){
+            return new ArrayList();
+        }
+        String config = this.config.getString(s).trim();
+        return Arrays.asList(COMMA_WITH_WHITESPACE.split(config, -1));
+    }
+
+    /**
+     * get list by class
+     * @param s
+     * @param clazz
+     * @param <T>
+     * @return
+     */
+    public <T> List<T> getList(String s, Class<T> clazz) {
+        List configs = getList(s);
+        List<T> castConfigs = new ArrayList<>();
+        configs.forEach(config ->{
+            castConfigs.add(clazz.cast(config));
+        });
+        return castConfigs;
+    }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
new file mode 100644
index 0000000..57c38c9
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.transforms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SchemaHelper {
+    static final Map<Class<?>, FieldType> PRIMITIVES;
+
+    public SchemaHelper() {
+    }
+
+    public static Schema schema(Object input) {
+        return builder(input).build();
+    }
+
+    public static SchemaBuilder builder(Object input) {
+        Preconditions.checkNotNull(input, "input cannot be null.");
+        SchemaBuilder builder;
+        if (PRIMITIVES.containsKey(input.getClass())) {
+            FieldType type = PRIMITIVES.get(input.getClass());
+            builder = new SchemaBuilder(type);
+        } else if (input instanceof Date) {
+            builder = Timestamp.builder();
+        } else {
+            if (!(input instanceof BigDecimal)) {
+                throw new 
UnsupportedOperationException(String.format("Unsupported Type: %s", 
input.getClass()));
+            }
+            builder = Decimal.builder(((BigDecimal)input).scale());
+        }
+
+        return builder.optional();
+    }
+
+    static {
+        Map<Class<?>, FieldType> primitives = new HashMap();
+        primitives.put(String.class, FieldType.STRING);
+        primitives.put(Boolean.class, FieldType.BOOLEAN);
+        primitives.put(Byte.class, FieldType.INT8);
+        primitives.put(Short.class, FieldType.INT16);
+        primitives.put(Integer.class, FieldType.INT32);
+        primitives.put(Long.class, FieldType.INT64);
+        primitives.put(Float.class, FieldType.FLOAT32);
+        primitives.put(Double.class, FieldType.FLOAT64);
+        primitives.put(byte[].class, FieldType.BYTES);
+        PRIMITIVES = ImmutableMap.copyOf(primitives);
+    }
+}
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
new file mode 100644
index 0000000..2317a09
--- /dev/null
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
@@ -0,0 +1,29 @@
+package org.apache.rocketmq.connect.transforms.util;
+
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+
+/**
+ * schema util
+ */
+public class SchemaUtil {
+    public static Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+    public static Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+    public static Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+    public static Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+    public static Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
+    public static Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
+    public static Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+    public static Schema STRING_SCHEMA = SchemaBuilder.string().build();
+    public static Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+
+    public static Schema OPTIONAL_INT8_SCHEMA = 
SchemaBuilder.int8().optional().build();
+    public static Schema OPTIONAL_INT16_SCHEMA = 
SchemaBuilder.int16().optional().build();
+    public static Schema OPTIONAL_INT32_SCHEMA = 
SchemaBuilder.int32().optional().build();
+    public static Schema OPTIONAL_INT64_SCHEMA = 
SchemaBuilder.int64().optional().build();
+    public static Schema OPTIONAL_FLOAT32_SCHEMA = 
SchemaBuilder.float32().optional().build();
+    public static Schema OPTIONAL_FLOAT64_SCHEMA = 
SchemaBuilder.float64().optional().build();
+    public static Schema OPTIONAL_BOOLEAN_SCHEMA = 
SchemaBuilder.bool().optional().build();
+    public static Schema OPTIONAL_STRING_SCHEMA = 
SchemaBuilder.string().optional().build();
+    public static Schema OPTIONAL_BYTES_SCHEMA = 
SchemaBuilder.bytes().optional().build();
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
new file mode 100644
index 0000000..2e12eeb
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ChangeCaseTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.base.CaseFormat;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.ChangeCase;
+import org.apache.rocketmq.connect.transforms.ChangeCaseConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.HashMap;
+
+public class ChangeCaseTest extends TransformationTest {
+  public ChangeCaseTest() {
+    super(false);
+  }
+
+
+  @Override
+  protected Transform<ConnectRecord> create() {
+    ChangeCase.Value value = new ChangeCase.Value();
+    return value;
+  }
+
+  @Test
+  public void test() {
+
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(ChangeCaseConfig.FROM_CONFIG, 
CaseFormat.UPPER_UNDERSCORE.toString());
+    defaultKeyValue.put(ChangeCaseConfig.TO_CONFIG, 
CaseFormat.LOWER_UNDERSCORE.toString());
+    this.transformation.start(defaultKeyValue);
+    final Schema inputSchema = SchemaBuilder.struct()
+        .field("FIRST_NAME", SchemaUtil.STRING_SCHEMA)
+        .field("LAST_NAME", SchemaUtil.STRING_SCHEMA)
+        .build();
+    final Schema expectedSchema = SchemaBuilder.struct()
+        .field("first_name", SchemaUtil.STRING_SCHEMA)
+        .field("last_name", SchemaUtil.STRING_SCHEMA)
+        .build();
+    final Struct inputStruct = new Struct(inputSchema)
+        .put("FIRST_NAME", "test")
+        .put("LAST_NAME", "user");
+    final Struct expectedStruct = new Struct(expectedSchema)
+        .put("first_name", "test")
+        .put("last_name", "user");
+
+    final ConnectRecord inputRecord = new ConnectRecord(
+        new RecordPartition(new HashMap<>()),
+        new RecordOffset(new HashMap<>()),
+        System.currentTimeMillis(),
+        null,
+        null,
+        inputSchema,
+        inputStruct
+    );
+    for (int i = 0; i < 50; i++) {
+      final ConnectRecord transformedRecord = 
this.transformation.doTransform(inputRecord);
+      Assertions.assertNotNull(transformedRecord, "transformedRecord should 
not be null.");
+    }
+  }
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
new file mode 100644
index 0000000..8cc0cda
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/ExtractNestedFieldTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.ExtractNestedField;
+import org.apache.rocketmq.connect.transforms.ExtractNestedFieldConfig;
+import org.apache.rocketmq.connect.transforms.test.common.AssertStruct;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static 
org.apache.rocketmq.connect.transforms.test.common.AssertSchema.assertSchema;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public abstract class ExtractNestedFieldTest extends TransformationTest {
+  protected ExtractNestedFieldTest(boolean isKey) {
+    super(isKey);
+  }
+
+  @Test
+  public void test() {
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(ExtractNestedFieldConfig.INNER_FIELD_NAME_CONF, 
"state");
+    defaultKeyValue.put(ExtractNestedFieldConfig.OUTER_FIELD_NAME_CONF, 
"address");
+    defaultKeyValue.put(ExtractNestedFieldConfig.OUTPUT_FIELD_NAME_CONF, 
"state");
+
+    this.transformation.start(defaultKeyValue);
+
+    final Schema innerSchema = SchemaBuilder.struct()
+        .name("Address")
+        .field("city", SchemaUtil.STRING_SCHEMA)
+        .field("state", SchemaUtil.STRING_SCHEMA)
+        .build();
+    final Schema inputSchema = SchemaBuilder.struct()
+        .field("first_name", SchemaUtil.STRING_SCHEMA)
+        .field("last_name", SchemaUtil.STRING_SCHEMA)
+        .field("address", innerSchema)
+        .build();
+    final Schema expectedSchema = SchemaBuilder.struct()
+        .field("first_name", SchemaUtil.STRING_SCHEMA)
+        .field("last_name", SchemaUtil.STRING_SCHEMA)
+        .field("address", innerSchema)
+        .field("state", SchemaUtil.STRING_SCHEMA)
+        .build();
+    final Struct innerStruct = new Struct(innerSchema)
+        .put("city", "Austin")
+        .put("state", "tx");
+    final Struct inputStruct = new Struct(inputSchema)
+        .put("first_name", "test")
+        .put("last_name", "developer")
+        .put("address", innerStruct);
+    final Struct expectedStruct = new Struct(expectedSchema)
+        .put("first_name", "test")
+        .put("last_name", "developer")
+        .put("address", innerStruct)
+        .put("state", "tx");
+
+    final ConnectRecord inputRecord = new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+            null,
+            null,
+            inputSchema,
+            inputStruct
+    );
+    for (int i = 0; i < 50; i++) {
+      final ConnectRecord transformedRecord = 
this.transformation.doTransform(inputRecord);
+      assertNotNull(transformedRecord, "transformedRecord should not be 
null.");
+      assertSchema(expectedSchema, transformedRecord.getSchema());
+      AssertStruct.assertStruct(expectedStruct, (Struct) 
transformedRecord.getData());
+    }
+  }
+
+
+  public static class ValueTest extends ExtractNestedFieldTest {
+    protected ValueTest() {
+      super(false);
+    }
+
+    @Override
+    protected Transform<ConnectRecord> create() {
+      return new ExtractNestedField.Value();
+    }
+  }
+
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
new file mode 100644
index 0000000..5c207f5
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternFilterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.PatternFilter;
+import org.apache.rocketmq.connect.transforms.PatternFilterConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class PatternFilterTest {
+  public PatternFilter.Value transform;
+
+  @BeforeEach
+  public void before() {
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(PatternFilterConfig.FIELD_CONFIG, "input");
+    defaultKeyValue.put(PatternFilterConfig.PATTERN_CONFIG, "^filter$");
+
+    this.transform = new PatternFilter.Value();
+    this.transform.start(defaultKeyValue);
+  }
+
+  ConnectRecord map(String value) {
+    return new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+            null,
+            null,
+            null,
+            ImmutableMap.of("input", value)
+    );
+  }
+
+  ConnectRecord struct(String value) {
+    Schema schema = SchemaBuilder.struct()
+        .field("input", SchemaUtil.STRING_SCHEMA)
+        .build();
+    Struct struct = new Struct(schema)
+        .put("input", value);
+    return new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+            null,
+            null,
+            schema,
+            struct
+    );
+  }
+
+  @Test
+  public void filtered() {
+    assertNull(this.transform.doTransform(struct("filter")));
+    assertNull(this.transform.doTransform(map("filter")));
+  }
+
+  @Test
+  public void notFiltered() {
+    assertNotNull(this.transform.doTransform(struct("ok")));
+    assertNotNull(this.transform.doTransform(map("ok")));
+  }
+
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
new file mode 100644
index 0000000..330b658
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/PatternRenameTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import com.google.common.collect.ImmutableMap;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.PatternRename;
+import org.apache.rocketmq.connect.transforms.PatternRenameConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.rocketmq.connect.transforms.test.common.AssertSchema.assertSchema;
+import static 
org.apache.rocketmq.connect.transforms.test.common.AssertStruct.assertStruct;
+import static org.junit.Assert.assertNotNull;
+
+public abstract class PatternRenameTest extends TransformationTest {
+  final static String TOPIC = "test";
+  protected PatternRenameTest(boolean isKey) {
+    super(isKey);
+  }
+
+  @Test
+  public void schemaLess() {
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(PatternRenameConfig.FIELD_PATTERN_CONF, "\\.");
+    defaultKeyValue.put(PatternRenameConfig.FIELD_REPLACEMENT_CONF, "_");
+    this.transformation.start(defaultKeyValue);
+    final Map<String, Object> input = ImmutableMap.of(
+        "first.name", "example",
+        "last.name", "user"
+    );
+    final Map<String, Object> expected = ImmutableMap.of(
+        "first_name", "example",
+        "last_name", "user"
+    );
+
+    final Object key = isKey ? input : null;
+    final Object value = isKey ? null : input;
+    final Schema keySchema = null;
+    final Schema valueSchema = null;
+
+    final ConnectRecord inputRecord = new ConnectRecord(
+        new RecordPartition(new HashMap<>()),
+        new RecordOffset(new HashMap<>()),
+        System.currentTimeMillis(),
+        keySchema,
+        key,
+        valueSchema,
+        value
+    );
+    final ConnectRecord outputRecord = 
this.transformation.doTransform(inputRecord);
+    assertNotNull(outputRecord);
+  }
+
+  @Test
+  public void prefixed() {
+
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(PatternRenameConfig.FIELD_PATTERN_CONF, "^prefixed");
+    defaultKeyValue.put(PatternRenameConfig.FIELD_REPLACEMENT_CONF, "");
+    this.transformation.start(defaultKeyValue);
+
+    Schema inputSchema = SchemaBuilder.struct()
+            .name("testing")
+            .field("prefixedfirstname", SchemaUtil.STRING_SCHEMA)
+            .field("prefixedlastname", SchemaUtil.STRING_SCHEMA)
+            .build();
+    Struct inputStruct = new Struct(inputSchema)
+        .put("prefixedfirstname", "example")
+        .put("prefixedlastname", "user");
+
+    final Object key = isKey ? inputStruct : null;
+    final Object value = isKey ? null : inputStruct;
+    final Schema keySchema = isKey ? inputSchema : null;
+    final Schema valueSchema = isKey ? null : inputSchema;
+
+    final ConnectRecord inputRecord = new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+        keySchema,
+        key,
+        valueSchema,
+        value
+    );
+    final ConnectRecord outputRecord = 
this.transformation.doTransform(inputRecord);
+    assertNotNull(outputRecord);
+
+    final Schema actualSchema = isKey ? outputRecord.getKeySchema() : 
outputRecord.getSchema();
+    final Struct actualStruct = (Struct) (isKey ? outputRecord.getKey() : 
outputRecord.getData());
+
+    final Schema expectedSchema = SchemaBuilder.struct()
+        .name("testing")
+        .field("firstname", SchemaUtil.STRING_SCHEMA)
+        .field("lastname", SchemaUtil.STRING_SCHEMA).build();
+    Struct expectedStruct = new Struct(expectedSchema)
+        .put("firstname", "example")
+        .put("lastname", "user");
+
+    assertSchema(expectedSchema, actualSchema);
+    assertStruct(expectedStruct, actualStruct);
+  }
+
+  public static class KeyTest extends PatternRenameTest {
+    public KeyTest() {
+      super(true);
+    }
+
+    @Override
+    protected Transform<ConnectRecord> create() {
+      return new PatternRename.Key();
+    }
+  }
+
+  public static class ValueTest extends PatternRenameTest {
+    public ValueTest() {
+      super(false);
+    }
+
+    @Override
+    protected Transform<ConnectRecord> create() {
+      return new PatternRename.Value();
+    }
+  }
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
new file mode 100644
index 0000000..0cbd658
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetMaximumPrecisionTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.transforms.SetMaximumPrecision;
+import org.apache.rocketmq.connect.transforms.SetMaximumPrecisionConfig;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+
+import static 
org.apache.rocketmq.connect.transforms.test.common.AssertStruct.assertStruct;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class SetMaximumPrecisionTest {
+  ConnectRecord record(Struct struct) {
+    return new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+            null,
+            null,
+            struct.schema(),
+            struct
+    );
+  }
+
+  @Test
+  public void noop() {
+    Schema schema = SchemaBuilder.struct()
+        .field("first", SchemaUtil.STRING_SCHEMA)
+        .field("last", SchemaUtil.STRING_SCHEMA)
+        .field("email", SchemaUtil.STRING_SCHEMA)
+        .build();
+    Struct struct = new Struct(schema)
+        .put("first", "test")
+        .put("last", "user")
+        .put("first", "[email protected]");
+    ConnectRecord record = record(struct);
+    SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value();
+
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32);
+    transform.start(defaultKeyValue);
+    ConnectRecord actual = transform.doTransform(record);
+    assertNotNull(actual);
+    assertStruct((Struct) record.getData(), (Struct) actual.getData());
+  }
+
+  @Test
+  public void convert() {
+    final Schema inputSchema = SchemaBuilder.struct()
+        .field("first", Decimal.schema(5))
+        .field(
+            "second",
+            Decimal.builder(5)
+                
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16")
+                .optional()
+                .build()
+        )
+        .field(
+            "third",
+            Decimal.builder(5)
+                
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "48")
+                .optional()
+                .build()
+        )
+        .build();
+    final Struct inputStruct = new Struct(inputSchema)
+        .put("first", BigDecimal.ONE)
+        .put("second", null)
+        .put("third", BigDecimal.ONE);
+    final Schema expectedSchema = SchemaBuilder.struct()
+        .field(
+            "first",
+            Decimal.builder(5)
+                
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32")
+                .build()
+        )
+        .field(
+            "second",
+            Decimal.builder(5)
+                
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16")
+                .optional()
+                .build()
+        )
+        .field(
+            "third",
+            Decimal.builder(5)
+                
.parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32")
+                .optional()
+                .build()
+        )
+        .build();
+    final Struct expectedStruct = new Struct(expectedSchema)
+        .put("first", BigDecimal.ONE)
+        .put("second", null)
+        .put("third", BigDecimal.ONE);
+
+
+    ConnectRecord record = record(inputStruct);
+    SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value();
+    KeyValue defaultKeyValue = new DefaultKeyValue();
+    defaultKeyValue.put(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32);
+
+    transform.start(defaultKeyValue);
+    ConnectRecord actual = transform.doTransform(record);
+    assertNotNull(actual);
+    assertStruct(expectedStruct, (Struct) actual.getData());
+  }
+
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
new file mode 100644
index 0000000..328ec1b
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/SetNullTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.rocketmq.connect.transforms.SetNull;
+import org.apache.rocketmq.connect.transforms.util.SchemaUtil;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class SetNullTest {
+
+  @Test
+  public void test() {
+    final ConnectRecord input = new ConnectRecord(
+            new RecordPartition(new HashMap<>()),
+            new RecordOffset(new HashMap<>()),
+            System.currentTimeMillis(),
+            SchemaUtil.STRING_SCHEMA,
+            "key",
+            null,
+            ""
+    );
+    SetNull transform = new SetNull.Key();
+    final ConnectRecord actual = transform.doTransform(input);
+    assertNull(actual.getKey(), "key should be null.");
+    assertNull(actual.getKeySchema(), "keySchema should be null.");
+  }
+
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
new file mode 100644
index 0000000..2951d62
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/TransformationTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.junit.Before;
+
+public abstract class TransformationTest {
+  final boolean isKey;
+  final static String TOPIC = "test";
+  
+  protected TransformationTest(boolean isKey) {
+    this.isKey = isKey;
+  }
+
+  protected abstract Transform<ConnectRecord> create();
+
+  Transform<ConnectRecord> transformation;
+
+  @Before
+  public void before() {
+    this.transformation = create();
+  }
+
+
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
new file mode 100644
index 0000000..06313ae
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertSchema.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class AssertSchema {
+    private AssertSchema() {
+    }
+
+    public static void assertSchema(Schema expected, Schema actual) {
+        assertSchema(expected, actual, (String)null);
+    }
+
+    public static void assertSchema(Schema expected, Schema actual, String 
message) {
+        String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+        if (null == expected) {
+            Assertions.assertNull(actual, prefix + "actual should not be 
null.");
+        } else {
+            Assertions.assertNotNull(expected, prefix + "expected schema 
should not be null.");
+            Assertions.assertNotNull(actual, prefix + "actual schema should 
not be null.");
+            Assertions.assertEquals(expected.getName(), actual.getName(), 
prefix + "schema.name() should match.");
+            Assertions.assertEquals(expected.getFieldType(), 
actual.getFieldType(), prefix + "schema.type() should match.");
+            Assertions.assertEquals(expected.getDefaultValue(), 
actual.getDefaultValue(), prefix + "schema.defaultValue() should match.");
+            Assertions.assertEquals(expected.isOptional(), 
actual.isOptional(), prefix + "schema.isOptional() should match.");
+            Assertions.assertEquals(expected.getDoc(), actual.getDoc(), prefix 
+ "schema.doc() should match.");
+            Assertions.assertEquals(expected.getVersion(), 
actual.getVersion(), prefix + "schema.version() should match.");
+            GenericAssertions.assertMap(expected.getParameters(), 
actual.getParameters(), prefix + "schema.parameters() should match.");
+            if (null != expected.getDefaultValue()) {
+                Assertions.assertNotNull(actual.getDefaultValue(), 
"actual.defaultValue() should not be null.");
+                Class<?> expectedType = null;
+                switch(expected.getFieldType()) {
+                case INT8:
+                    expectedType = Byte.class;
+                    break;
+                case INT16:
+                    expectedType = Short.class;
+                    break;
+                case INT32:
+                    expectedType = Integer.class;
+                    break;
+                case INT64:
+                    expectedType = Long.class;
+                    break;
+                case FLOAT32:
+                    expectedType = Float.class;
+                    break;
+                case FLOAT64:
+                    expectedType = Float.class;
+                }
+
+                if (null != expectedType) {
+                    
Assertions.assertTrue(actual.getDefaultValue().getClass().isAssignableFrom(expectedType),
 String.format("actual.defaultValue() should be a %s", 
expectedType.getSimpleName()));
+                }
+            }
+
+            switch(expected.getFieldType()) {
+            case ARRAY:
+                assertSchema(expected.getValueSchema(), 
actual.getValueSchema(), message + "valueSchema does not match.");
+                break;
+            case MAP:
+                assertSchema(expected.getKeySchema(), actual.getKeySchema(), 
message + "keySchema does not match.");
+                assertSchema(expected.getValueSchema(), 
actual.getValueSchema(), message + "valueSchema does not match.");
+                break;
+            case STRUCT:
+                List<Field> expectedFields = expected.getFields();
+                List<Field> actualFields = actual.getFields();
+                Assertions.assertEquals(expectedFields.size(), 
actualFields.size(), prefix + "Number of fields do not match.");
+
+                for(int i = 0; i < expectedFields.size(); ++i) {
+                    Field expectedField = (Field)expectedFields.get(i);
+                    Field actualField = (Field)actualFields.get(i);
+                    assertField(expectedField, actualField, "index " + i);
+                }
+            }
+
+        }
+    }
+
+    public static void assertField(Field expected, Field actual, String 
message) {
+        String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+        if (null == expected) {
+            Assertions.assertNull(actual, prefix + "actual should be null.");
+        } else {
+            Assertions.assertEquals(expected.getName(), actual.getName(), 
prefix + "name does not match");
+            Assertions.assertEquals(expected.getIndex(), actual.getIndex(), 
prefix + "name does not match");
+            assertSchema(expected.getSchema(), actual.getSchema(), prefix + 
"schema does not match");
+        }
+    }
+
+    public static void assertField(Field expected, Field actual) {
+        assertField(expected, actual, (String)null);
+    }
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
new file mode 100644
index 0000000..78c6498
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/AssertStruct.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import com.google.common.io.BaseEncoding;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.data.logical.Decimal;
+import io.openmessaging.connector.api.data.logical.Time;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class AssertStruct {
+    private static final Logger log = 
LoggerFactory.getLogger(AssertStruct.class);
+
+    public AssertStruct() {
+    }
+
+    static <T> T castAndVerify(Class<T> cls, Struct struct, Field field, 
boolean expected) {
+        Object value = struct.get(field.getName());
+        String prefix = String.format("%s('%s') ", expected ? "expected" : 
"actual", field.getName());
+        if (!field.getSchema().isOptional()) {
+            Assertions.assertNotNull(value, prefix + "has a require schema. 
Should not be null.");
+        }
+
+        if (null == value) {
+            return null;
+        } else {
+            Assertions.assertTrue(cls.isInstance(value), String.format(prefix 
+ "should be a %s", cls.getSimpleName()));
+            return cls.cast(value);
+        }
+    }
+
+    public static void assertStruct(Struct expected, Struct actual, String 
message) {
+        String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+        if (null == expected) {
+            Assertions.assertNull(actual, prefix + "actual should be null.");
+        } else {
+            AssertSchema.assertSchema(expected.schema(), actual.schema(), 
"schema does not match.");
+            Iterator var4 = expected.schema().getFields().iterator();
+
+            while(true) {
+                while(var4.hasNext()) {
+                    Field expectedField = (Field)var4.next();
+                    log.trace("assertStruct() - testing field '{}'", 
expectedField.getName());
+                    Object expectedValue = 
expected.get(expectedField.getName());
+                    Object actualValue = actual.get(expectedField.getName());
+                    if 
(Decimal.LOGICAL_NAME.equals(expectedField.getSchema().getName())) {
+                        BigDecimal expectedDecimal = 
(BigDecimal)castAndVerify(BigDecimal.class, expected, expectedField, true);
+                        BigDecimal actualDecimal = 
(BigDecimal)castAndVerify(BigDecimal.class, actual, expectedField, false);
+                        Assertions.assertEquals(expectedDecimal, 
actualDecimal, prefix + expectedField.getName() + " does not match.");
+                    } else if 
(!Timestamp.LOGICAL_NAME.equals(expectedField.getSchema().getName()) && 
!io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(expectedField.getSchema().getName())
 && !Time.LOGICAL_NAME.equals(expectedField.getSchema().getName())) {
+                        switch(expectedField.getSchema().getFieldType()) {
+                        case ARRAY:
+                            List<Object> expectedArray = 
(List)castAndVerify(List.class, expected, expectedField, true);
+                            List<Object> actualArray = 
(List)castAndVerify(List.class, actual, expectedField, false);
+                            Assertions.assertEquals(expectedArray, 
actualArray, prefix + expectedField.getName() + " does not match.");
+                            break;
+                        case MAP:
+                            Map<Object, Object> expectedMap = 
(Map)castAndVerify(Map.class, expected, expectedField, true);
+                            Map<Object, Object> actualMap = 
(Map)castAndVerify(Map.class, actual, expectedField, false);
+                            Assertions.assertEquals(expectedMap, actualMap, 
prefix + expectedField.getName() + " does not match.");
+                            break;
+                        case STRUCT:
+                            Struct expectedStruct = 
(Struct)castAndVerify(Struct.class, expected, expectedField, true);
+                            Struct actualStruct = 
(Struct)castAndVerify(Struct.class, actual, expectedField, false);
+                            assertStruct(expectedStruct, actualStruct, prefix 
+ expectedField.getName() + " does not match.");
+                            break;
+                        case BYTES:
+                            byte[] expectedByteArray = 
(byte[])castAndVerify(byte[].class, expected, expectedField, true);
+                            byte[] actualByteArray = 
(byte[])castAndVerify(byte[].class, actual, expectedField, false);
+                            Assertions.assertEquals(null == expectedByteArray 
? "" : BaseEncoding.base32Hex().encode(expectedByteArray).toString(), null == 
actualByteArray ? "" : 
BaseEncoding.base32Hex().encode(actualByteArray).toString(), prefix + 
expectedField.getName() + " does not match.");
+                            break;
+                        default:
+                            Assertions.assertEquals(expectedValue, 
actualValue, prefix + expectedField.getName() + " does not match.");
+                        }
+                    } else {
+                        Date expectedDate = (Date)castAndVerify(Date.class, 
expected, expectedField, true);
+                        Date actualDate = (Date)castAndVerify(Date.class, 
actual, expectedField, false);
+                        Assertions.assertEquals(expectedDate, actualDate, 
prefix + expectedField.getName() + " does not match.");
+                    }
+                }
+
+                return;
+            }
+        }
+    }
+
+    public static void assertStruct(Struct expected, Struct actual) {
+        assertStruct(expected, actual, (String)null);
+    }
+}
diff --git 
a/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
new file mode 100644
index 0000000..f14f605
--- /dev/null
+++ 
b/transforms/src/test/java/org/apache/rocketmq/connect/transforms/test/common/GenericAssertions.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.transforms.test.common;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericAssertions {
+  private GenericAssertions() {
+
+  }
+
+  static class MapDifferenceSupplier implements Supplier<String> {
+    final MapDifference<?, ?> mapDifference;
+    final String method;
+
+    public MapDifferenceSupplier(MapDifference<?, ?> mapDifference, String 
method) {
+      this.mapDifference = mapDifference;
+      this.method = method;
+    }
+
+    @Override
+    public String get() {
+      try (Writer w = new StringWriter()) {
+        try (BufferedWriter writer = new BufferedWriter(w)) {
+          writer.append(String.format("Map for actual.%s() does not match 
expected.%s().", this.method, this.method));
+          writer.newLine();
+          Map<?, ? extends MapDifference.ValueDifference<?>> differences = 
mapDifference.entriesDiffering();
+          if (!differences.isEmpty()) {
+            writer.append("Keys with Differences");
+            writer.newLine();
+            for (Map.Entry<?, ? extends MapDifference.ValueDifference<?>> kvp 
: differences.entrySet()) {
+              writer.append("  ");
+              writer.append(kvp.getKey().toString());
+              writer.newLine();
+
+              writer.append("    expected:");
+              writer.append(kvp.getValue().leftValue().toString());
+              writer.newLine();
+
+              writer.append("    actual:");
+              writer.append(kvp.getValue().rightValue().toString());
+              writer.newLine();
+            }
+          }
+
+          Map<?, ?> entries = mapDifference.entriesOnlyOnLeft();
+          writeEntries(writer, "Only in expected map", entries);
+
+          Map<?, ?> onlyInActual = mapDifference.entriesOnlyOnRight();
+          writeEntries(writer, "Only in actual map", onlyInActual);
+        }
+        return w.toString();
+      } catch (IOException ex) {
+        throw new IllegalStateException(ex);
+      }
+    }
+
+    private void writeEntries(BufferedWriter writer, String header, Map<?, ?> 
entries) throws IOException {
+      if (!entries.isEmpty()) {
+        writer.append(header);
+        writer.newLine();
+
+        for (Map.Entry<?, ?> kvp : entries.entrySet()) {
+          writer.append("  ");
+          writer.append(kvp.getKey().toString());
+          writer.append(": ");
+          writer.append(kvp.getValue().toString());
+          writer.newLine();
+        }
+        writer.newLine();
+      }
+    }
+  }
+
+  static void assertMap(Map<String, ?> expected, Map<String, ?> actual, String 
message) {
+    if (null == expected && null == actual) {
+      return;
+    }
+
+    String prefix = Strings.isNullOrEmpty(message) ? "" : message + ": ";
+    assertNotNull(expected, prefix + "expected cannot be null");
+    assertNotNull(actual, prefix + "actual cannot be null");
+    MapDifference<String, ?> mapDifference = Maps.difference(expected, actual);
+    assertTrue(mapDifference.areEqual(), new 
MapDifferenceSupplier(mapDifference, prefix));
+  }
+}

Reply via email to