This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a12824e fix compile error
7a12824e is described below

commit 7a12824e56817e99c4a5f0272c007c30e06d61ff
Author: yuntian.zb <[email protected]>
AuthorDate: Mon Aug 29 20:46:15 2022 +0800

    fix compile error
---
 .../runtime/config/SourceConnectorConfig.java      |  18 ++
 .../rocketmq/connect/runtime/rest/RestHandler.java |   4 +-
 .../WorkerSinkTaskContextTest.java                 |   6 +-
 .../DistributedConnectControllerTest.java          |   5 +-
 .../distributed/TestConfigManagementService.java   |  18 +-
 .../controller/isolation/TestFileSystem.java       |   4 +-
 .../StandaloneConnectControllerTest.java           |   6 +-
 .../errors/DeadLetterQueueReporterTest.java        |   7 +-
 .../runtime/errors/ReporterManagerUtilTest.java    |   4 +-
 .../store/PositionStorageReaderImplTest.java       |   6 +-
 .../runtime/store/PositionStorageWriterTest.java   |   6 +-
 .../rocketmq/connect/transforms/ChangeCase.java    | 159 ++++++-------
 .../connect/transforms/ChangeCaseConfig.java       |  55 ++---
 .../connect/transforms/ExtractNestedField.java     | 148 ++++++------
 .../transforms/ExtractNestedFieldConfig.java       |  64 +++---
 .../rocketmq/connect/transforms/PatternFilter.java | 162 +++++++-------
 .../connect/transforms/PatternFilterConfig.java    |  55 +++--
 .../rocketmq/connect/transforms/PatternRename.java | 198 +++++++++--------
 .../connect/transforms/PatternRenameConfig.java    |  79 ++++---
 .../rocketmq/connect/transforms/RegexRouter.java   |   7 +-
 .../connect/transforms/SetMaximumPrecision.java    | 247 +++++++++++----------
 .../transforms/SetMaximumPrecisionConfig.java      |  18 +-
 .../rocketmq/connect/transforms/SetNull.java       |  71 +++---
 .../connect/transforms/util/ExtendKeyValue.java    |  29 ++-
 .../connect/transforms/util/SchemaHelper.java      |   2 +-
 .../connect/transforms/util/SchemaUtil.java        |  53 +++--
 26 files changed, 754 insertions(+), 677 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
index 7fc12ec7..12012917 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.rocketmq.connect.runtime.config;
 
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 6067bb53..9fc7e68e 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -44,8 +44,8 @@ import java.util.Set;
 public class RestHandler {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-    private final String CONNECTOR_NAME = "connectorName";
-    private final String TASK_NAME = "task";
+    private static final String CONNECTOR_NAME = "connectorName";
+    private static final String TASK_NAME = "task";
     private final AbstractConnectController connectController;
 
     /** connector plugin resource */
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
index f0dc3f27..060a81d7 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.assertj.core.api.Assertions;
@@ -42,7 +43,8 @@ public class WorkerSinkTaskContextTest {
     @Mock
     private WorkerSinkTask workerSinkTask;
 
-    private DefaultMQPullConsumer defaultMQPullConsumer = new 
DefaultMQPullConsumer();
+    private DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
+
 
     private RecordPartition recordPartition;
 
@@ -58,7 +60,7 @@ public class WorkerSinkTaskContextTest {
         Map<String, String> offset = new HashMap<>();
         offset.put("queueOffset", "0");
         recordOffset = new RecordOffset(offset);
-        workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue, 
workerSinkTask, defaultMQPullConsumer);
+        workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue, 
workerSinkTask, consumer);
     }
 
     @Test
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
index 0ca330a6..c39fca54 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
@@ -58,6 +59,8 @@ public class DistributedConnectControllerTest {
 
     private ServerResponseMocker brokerMocker;
 
+    private StateManagementService stateManagementService;
+
     @Before
     public void before() throws InterruptedException {
         nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
@@ -65,7 +68,7 @@ public class DistributedConnectControllerTest {
         connectConfig.setNamesrvAddr("127.0.0.1:9876");
         clusterManagementService.initialize(connectConfig);
         distributedConnectController = new 
DistributedConnectController(plugin, distributedConfig, 
clusterManagementService,
-            configManagementService, positionManagementService);
+            configManagementService, positionManagementService, 
stateManagementService);
     }
 
     @After
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
index b9c77cee..198c99a1 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java
@@ -25,6 +25,7 @@ import 
org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.apache.rocketmq.connect.runtime.store.ClusterConfigState;
 
 public class TestConfigManagementService implements ConfigManagementService {
     @Override public void start() {
@@ -39,15 +40,20 @@ public class TestConfigManagementService implements 
ConfigManagementService {
         return null;
     }
 
-    @Override public Map<String, ConnectKeyValue> 
getConnectorConfigsIncludeDeleted() {
+
+    @Override public String putConnectorConfig(String connectorName, 
ConnectKeyValue configs) {
         return null;
     }
 
-    @Override public String putConnectorConfig(String connectorName, 
ConnectKeyValue configs) throws Exception {
-        return null;
+    @Override public void deleteConnectorConfig(String connectorName) {
+
     }
 
-    @Override public void removeConnectorConfig(String connectorName) {
+    @Override public void pauseConnector(String connectorName) {
+
+    }
+
+    @Override public void resumeConnector(String connectorName) {
 
     }
 
@@ -72,6 +78,10 @@ public class TestConfigManagementService implements 
ConfigManagementService {
 
     }
 
+    @Override public ClusterConfigState snapshot() {
+        return null;
+    }
+
     @Override public Plugin getPlugin() {
         return null;
     }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
index 30e673aa..88eaa028 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
@@ -27,11 +27,11 @@ import java.nio.file.attribute.UserPrincipalLookupService;
 import java.nio.file.spi.FileSystemProvider;
 import java.util.Set;
 import org.jetbrains.annotations.NotNull;
-import sun.nio.fs.MacOSXFileSystemProvider;
 
 public class TestFileSystem extends FileSystem {
     @Override public FileSystemProvider provider() {
-        return new MacOSXFileSystemProvider();
+//        return new MacOSXFileSystemProvider();
+        return null;
     }
 
     @Override public void close() throws IOException {
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
index 7652f3f2..f74116e0 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Before;
@@ -51,6 +52,9 @@ public class StandaloneConnectControllerTest {
 
     private PositionManagementService positionManagementService = new 
TestPositionManageServiceImpl();
 
+    @Mock
+    private StateManagementService stateManagementService;
+
     @Before
     public void before() {
         NameServerMocker.startByDefaultConf(9876, 10911);
@@ -59,7 +63,7 @@ public class StandaloneConnectControllerTest {
         standaloneConfig.setHttpPort(10001);
         clusterManagementService.initialize(standaloneConfig);
         standaloneConnectController = new StandaloneConnectController(plugin, 
standaloneConfig, clusterManagementService,
-            configManagementService, positionManagementService);
+            configManagementService, positionManagementService, 
stateManagementService);
     }
 
     @After
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
index e9c8e48a..ac072c95 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
@@ -23,7 +23,8 @@ import java.util.Map;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
 import org.assertj.core.api.Assertions;
@@ -84,9 +85,9 @@ public class DeadLetterQueueReporterTest {
     private DeadLetterQueueReporter buildDeadLetterQueueReporter() {
         ConnectKeyValue sinkConfig = new ConnectKeyValue();
         Map<String, String> properties = new HashMap<>();
-        properties.put(DeadLetterQueueConfig.DLQ_TOPIC_NAME_CONFIG, 
"DEAD_LETTER_TOPIC");
+        properties.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, 
"DEAD_LETTER_TOPIC");
         sinkConfig.setProperties(properties);
-        ConnectConfig workerConfig = new ConnectConfig();
+        WorkerConfig workerConfig = new WorkerConfig();
         final DeadLetterQueueReporter deadLetterQueueReporter = 
DeadLetterQueueReporter.build("fileSinkConnector", sinkConfig, workerConfig);
         return deadLetterQueueReporter;
     }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
index db0c3137..91da86d1 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.runtime.errors;
 import io.openmessaging.connector.api.data.RecordConverter;
 import java.util.List;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.converter.record.StringConverter;
 import org.junit.Assert;
 import org.junit.Test;
@@ -48,7 +48,7 @@ public class ReporterManagerUtilTest {
 
     @Test
     public void sinkTaskReportersTest() {
-        ConnectConfig workerConfig = new ConnectConfig();
+        WorkerConfig workerConfig = new WorkerConfig();
         final List<ErrorReporter> connector = 
ReporterManagerUtil.sinkTaskReporters("testConnector", connectKeyValue, 
workerConfig);
         Assert.assertEquals(1, connector.size());
     }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
index 91ff23bd..15f0d010 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImplTest.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -49,7 +49,7 @@ public class PositionStorageReaderImplTest {
 
     private RecordOffset recordOffset;
 
-    private ConnectConfig connectConfig;
+    private WorkerConfig connectConfig;
 
     private ServerResponseMocker nameServerMocker;
 
@@ -60,7 +60,7 @@ public class PositionStorageReaderImplTest {
         nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
         brokerMocker = ServerResponseMocker.startServer(10911, "Hello 
World".getBytes(StandardCharsets.UTF_8));
 
-        connectConfig = new ConnectConfig();
+        connectConfig = new WorkerConfig();
         connectConfig.setNamesrvAddr("localhost:9876");
 
         positionManagementService = new PositionManagementServiceImpl();
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
index 381177de..b12625df 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriterTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
@@ -40,7 +40,7 @@ public class PositionStorageWriterTest {
 
     private PositionManagementService positionManagementService;
 
-    private ConnectConfig connectConfig;
+    private WorkerConfig connectConfig;
 
     private ServerResponseMocker nameServerMocker;
 
@@ -65,7 +65,7 @@ public class PositionStorageWriterTest {
         offset.put("queueOffset", 0L);
         recordOffset = new RecordOffset(offset);
 
-        connectConfig = new ConnectConfig();
+        connectConfig = new WorkerConfig();
         connectConfig.setNamesrvAddr("localhost:9876");
 
         positionManagementService = new PositionManagementServiceImpl();
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
index b8094d26..0e62498c 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCase.java
@@ -33,97 +33,98 @@ import java.util.Map;
 
 /**
  * change case
+ *
  * @param <R>
  */
 public abstract class ChangeCase<R extends ConnectRecord> extends 
BaseTransformation<R> {
-  private static final Logger log = LoggerFactory.getLogger(ChangeCase.class);
-
-  class State {
-    public final Map<String, String> columnMapping;
-    public final Schema schema;
-    State(Map<String, String> columnMapping, Schema schema) {
-      this.columnMapping = columnMapping;
-      this.schema = schema;
-    }
-  }
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChangeCase.class);
 
-  Map<Schema, State> schemaState = new HashMap<>();
-  private ChangeCaseConfig config;
-  @Override
-  public void start(KeyValue config) {
-    this.config = new ChangeCaseConfig(config);
-  }
+    class State {
+        public final Map<String, String> columnMapping;
+        public final Schema schema;
 
-  @Override
-  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
-    // state
-    final State state = this.schemaState.computeIfAbsent(inputSchema, schema 
-> {
-      final SchemaBuilder builder = SchemaBuilder.struct();
-      if (!Strings.isNullOrEmpty(schema.getName())) {
-        builder.name(schema.getName());
-      }
-      if (schema.isOptional()) {
-        builder.optional();
-      }
-      final Map<String, String> columnMapping = new LinkedHashMap<>();
-      for (Field field : schema.getFields()) {
-        final String newFieldName = this.config.from.to(this.config.to, 
field.getName());
-        log.trace("processStruct() - Mapped '{}' to '{}'", field.getName(), 
newFieldName);
-        columnMapping.put(field.getName(), newFieldName);
-        builder.field(newFieldName, field.getSchema());
-      }
-      return new State(columnMapping, builder.build());
-    });
+        State(Map<String, String> columnMapping, Schema schema) {
+            this.columnMapping = columnMapping;
+            this.schema = schema;
+        }
+    }
 
+    Map<Schema, State> schemaState = new HashMap<>();
+    private ChangeCaseConfig config;
 
-    final Struct outputStruct = new Struct(state.schema);
-    for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
-      final Object value = input.get(kvp.getKey());
-      outputStruct.put(kvp.getValue(), value);
+    @Override
+    public void start(KeyValue config) {
+        this.config = new ChangeCaseConfig(config);
     }
-    return new SchemaAndValue(state.schema, outputStruct);
-  }
 
-  /**
-   * transform key
-   */
-  public static class Key extends ChangeCase<ConnectRecord> {
     @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              transformed.schema(),
-              transformed.value(),
-              r.getSchema(),
-              r.getData()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    protected SchemaAndValue processStruct(R record, Schema inputSchema, 
Struct input) {
+        // state
+        final State state = this.schemaState.computeIfAbsent(inputSchema, 
schema -> {
+            final SchemaBuilder builder = SchemaBuilder.struct();
+            if (!Strings.isNullOrEmpty(schema.getName())) {
+                builder.name(schema.getName());
+            }
+            if (schema.isOptional()) {
+                builder.optional();
+            }
+            final Map<String, String> columnMapping = new LinkedHashMap<>();
+            for (Field field : schema.getFields()) {
+                final String newFieldName = 
this.config.from.to(this.config.to, field.getName());
+                LOGGER.trace("processStruct() - Mapped '{}' to '{}'", 
field.getName(), newFieldName);
+                columnMapping.put(field.getName(), newFieldName);
+                builder.field(newFieldName, field.getSchema());
+            }
+            return new State(columnMapping, builder.build());
+        });
+
+        final Struct outputStruct = new Struct(state.schema);
+        for (Map.Entry<String, String> kvp : state.columnMapping.entrySet()) {
+            final Object value = input.get(kvp.getKey());
+            outputStruct.put(kvp.getValue(), value);
+        }
+        return new SchemaAndValue(state.schema, outputStruct);
     }
-  }
 
+    /**
+     * transform key
+     */
+    public static class Key extends ChangeCase<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                transformed.schema(),
+                transformed.value(),
+                r.getSchema(),
+                r.getData()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
+    }
 
-  /**
-   * transform value
-   */
-  public static class Value extends ChangeCase<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              r.getKeySchema(),
-              r.getKey(),
-              transformed.schema(),
-              transformed.value()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    /**
+     * transform value
+     */
+    public static class Value extends ChangeCase<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                r.getKeySchema(),
+                r.getKey(),
+                transformed.schema(),
+                transformed.value()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
     }
-  }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
index fa9a5c99..805310b1 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ChangeCaseConfig.java
@@ -16,39 +16,40 @@
  */
 package org.apache.rocketmq.connect.transforms;
 
-
 import com.google.common.base.CaseFormat;
 import io.openmessaging.KeyValue;
 
 public class ChangeCaseConfig {
-  public final CaseFormat from;
-  public final CaseFormat to;
+    public final CaseFormat from;
+    public final CaseFormat to;
 
-  public static final String FROM_CONFIG = "from";
-  static final String FROM_DOC = "The format to move from ";
-  public static final String TO_CONFIG = "to";
-  static final String TO_DOC = "";
+    public static final String FROM_CONFIG = "from";
+    static final String FROM_DOC = "The format to move from ";
+    public static final String TO_CONFIG = "to";
+    static final String TO_DOC = "";
 
-  public ChangeCaseConfig(KeyValue config) {
-    String fromConfig = config.getString(FROM_CONFIG);
-    this.from = CaseFormat.valueOf(fromConfig);
-    String toConfig = config.getString(TO_CONFIG);
-    this.to = CaseFormat.valueOf(toConfig);
-  }
+    public ChangeCaseConfig(KeyValue config) {
+        String fromConfig = config.getString(FROM_CONFIG);
+        this.from = CaseFormat.valueOf(fromConfig);
+        String toConfig = config.getString(TO_CONFIG);
+        this.to = CaseFormat.valueOf(toConfig);
+    }
 
-  /**
-   * from
-   * @return
-   */
-  public CaseFormat from(){
-    return this.from;
-  }
+    /**
+     * from
+     *
+     * @return
+     */
+    public CaseFormat from() {
+        return this.from;
+    }
 
-  /**
-   * to
-   * @return
-   */
-  public CaseFormat to(){
-    return this.to;
-  }
+    /**
+     * to
+     *
+     * @return
+     */
+    public CaseFormat to() {
+        return this.to;
+    }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
index 122951d0..fc8bcfa8 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedField.java
@@ -32,91 +32,91 @@ import java.util.Map;
 
 /**
  * extract nested field
+ *
  * @param <R>
  */
 public abstract class ExtractNestedField<R extends ConnectRecord> extends 
BaseTransformation<R> {
-  private static final Logger log = 
LoggerFactory.getLogger(ExtractNestedField.class);
-  private ExtractNestedFieldConfig config;
-  Map<Schema, Schema> schemaCache;
-  @Override
-  public void start(KeyValue keyValue) {
-    this.config = new ExtractNestedFieldConfig(keyValue);
-    this.schemaCache = new HashMap<>();
-  }
-
-
-  @Override
-  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
-    final Struct innerStruct = input.getStruct(this.config.outerFieldName);
-    final Schema outputSchema = this.schemaCache.computeIfAbsent(inputSchema, 
s -> {
+    private static final Logger log = 
LoggerFactory.getLogger(ExtractNestedField.class);
+    private ExtractNestedFieldConfig config;
+    Map<Schema, Schema> schemaCache;
 
-      final Field innerField = 
innerStruct.schema().getField(this.config.innerFieldName);
-      final SchemaBuilder builder = SchemaBuilder.struct();
-      if (!Strings.isNullOrEmpty(inputSchema.getName())) {
-        builder.name(inputSchema.getName());
-      }
-      if (inputSchema.isOptional()) {
-        builder.optional();
-      }
-      for (Field inputField : inputSchema.getFields()) {
-        builder.field(inputField.getName(), inputField.getSchema());
-      }
-      builder.field(this.config.innerFieldName, innerField.getSchema());
-      return builder.build();
-    });
-    final Struct outputStruct = new Struct(outputSchema);
-    for (Field inputField : inputSchema.getFields()) {
-      final Object value = input.get(inputField);
-      outputStruct.put(inputField.getName(), value);
+    @Override
+    public void start(KeyValue keyValue) {
+        this.config = new ExtractNestedFieldConfig(keyValue);
+        this.schemaCache = new HashMap<>();
     }
-    final Object innerFieldValue = innerStruct.get(this.config.innerFieldName);
-    outputStruct.put(this.config.innerFieldName, innerFieldValue);
 
-    return new SchemaAndValue(outputSchema, outputStruct);
+    @Override
+    protected SchemaAndValue processStruct(R record, Schema inputSchema, 
Struct input) {
+        final Struct innerStruct = input.getStruct(this.config.outerFieldName);
+        final Schema outputSchema = 
this.schemaCache.computeIfAbsent(inputSchema, s -> {
 
-  }
+            final Field innerField = 
innerStruct.schema().getField(this.config.innerFieldName);
+            final SchemaBuilder builder = SchemaBuilder.struct();
+            if (!Strings.isNullOrEmpty(inputSchema.getName())) {
+                builder.name(inputSchema.getName());
+            }
+            if (inputSchema.isOptional()) {
+                builder.optional();
+            }
+            for (Field inputField : inputSchema.getFields()) {
+                builder.field(inputField.getName(), inputField.getSchema());
+            }
+            builder.field(this.config.innerFieldName, innerField.getSchema());
+            return builder.build();
+        });
+        final Struct outputStruct = new Struct(outputSchema);
+        for (Field inputField : inputSchema.getFields()) {
+            final Object value = input.get(inputField);
+            outputStruct.put(inputField.getName(), value);
+        }
+        final Object innerFieldValue = 
innerStruct.get(this.config.innerFieldName);
+        outputStruct.put(this.config.innerFieldName, innerFieldValue);
+
+        return new SchemaAndValue(outputSchema, outputStruct);
 
-  /**
-   * transform key
-   */
-  public static class Key extends ExtractNestedField<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              transformed.schema(),
-              transformed.value(),
-              r.getSchema(),
-              r.getData()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
     }
-  }
 
+    /**
+     * transform key
+     */
+    public static class Key extends ExtractNestedField<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                transformed.schema(),
+                transformed.value(),
+                r.getSchema(),
+                r.getData()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
+    }
 
-  /**
-   * transform value
-   */
-  public static class Value extends ExtractNestedField<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              r.getKeySchema(),
-              r.getKey(),
-              transformed.schema(),
-              transformed.value()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    /**
+     * transform value
+     */
+    public static class Value extends ExtractNestedField<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                r.getKeySchema(),
+                r.getKey(),
+                transformed.schema(),
+                transformed.value()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
     }
-  }
 
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
index 9cd2f233..1c689a2a 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/ExtractNestedFieldConfig.java
@@ -18,41 +18,39 @@ package org.apache.rocketmq.connect.transforms;
 
 import io.openmessaging.KeyValue;
 
-
 /**
  * extract nested field config
  */
-public class ExtractNestedFieldConfig{
-  public final String outerFieldName;
-  public final String innerFieldName;
-  public final String outputFieldName;
-
-  public ExtractNestedFieldConfig(KeyValue config) {
-    this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
-    this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
-    this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
-  }
-
-  public static final String OUTER_FIELD_NAME_CONF = "input.outer.field.name";
-  static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct 
containing the child struct. " +
-      "For example if you wanted the extract `address.state` you would use 
`address`.";
-  public static final String INNER_FIELD_NAME_CONF = "input.inner.field.name";
-  static final String INNER_FIELD_NAME_DOC = "The field on the child struct 
containing the field to be extracted. " +
-      "For example if you wanted the extract `address.state` you would use 
`state`.";
-  public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
-  static final String OUTPUT_FIELD_NAME_DOC = "The field to place the 
extracted value into.";
-
-
-  public String outerFieldName(){
-    return this.outerFieldName;
-  }
-
-  public String innerFieldName(){
-    return this.innerFieldName;
-  }
-
-  public String outputFieldName(){
-    return this.outerFieldName;
-  }
+public class ExtractNestedFieldConfig {
+    public final String outerFieldName;
+    public final String innerFieldName;
+    public final String outputFieldName;
+
+    public ExtractNestedFieldConfig(KeyValue config) {
+        this.outerFieldName = config.getString(OUTER_FIELD_NAME_CONF);
+        this.innerFieldName = config.getString(INNER_FIELD_NAME_CONF);
+        this.outputFieldName = config.getString(OUTPUT_FIELD_NAME_CONF);
+    }
+
+    public static final String OUTER_FIELD_NAME_CONF = 
"input.outer.field.name";
+    static final String OUTER_FIELD_NAME_DOC = "The field on the parent struct 
containing the child struct. " +
+        "For example if you wanted the extract `address.state` you would use 
`address`.";
+    public static final String INNER_FIELD_NAME_CONF = 
"input.inner.field.name";
+    static final String INNER_FIELD_NAME_DOC = "The field on the child struct 
containing the field to be extracted. " +
+        "For example if you wanted the extract `address.state` you would use 
`state`.";
+    public static final String OUTPUT_FIELD_NAME_CONF = "output.field.name";
+    static final String OUTPUT_FIELD_NAME_DOC = "The field to place the 
extracted value into.";
+
+    public String outerFieldName() {
+        return this.outerFieldName;
+    }
+
+    public String innerFieldName() {
+        return this.innerFieldName;
+    }
+
+    public String outputFieldName() {
+        return this.outerFieldName;
+    }
 
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
index e2a71fff..c219535d 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilter.java
@@ -31,102 +31,104 @@ import java.util.regex.Pattern;
 
 /**
  * pattern filter
+ *
  * @param <R>
  */
 public abstract class PatternFilter<R extends ConnectRecord> extends 
BaseTransformation<R> {
 
-  private static final Logger log = 
LoggerFactory.getLogger(PatternFilter.class);
-  public Pattern pattern;
-  public Set<String> fields;
+    private static final Logger log = 
LoggerFactory.getLogger(PatternFilter.class);
+    public Pattern pattern;
+    public Set<String> fields;
 
-  private PatternFilterConfig config;
-  @Override
-  public void start(KeyValue config) {
-    this.config = new PatternFilterConfig(config);
-    this.pattern = this.config.pattern();
-    this.fields =  this.config.fields();
-  }
+    private PatternFilterConfig config;
 
+    @Override
+    public void start(KeyValue config) {
+        this.config = new PatternFilterConfig(config);
+        this.pattern = this.config.pattern();
+        this.fields = this.config.fields();
+    }
 
-  R filter(R record, Struct struct) {
-    for (Field field : struct.schema().getFields()) {
-      if (!this.fields.contains(field.getName()) || 
field.getSchema().getFieldType() != FieldType.STRING) {
-        continue;
-      }
-      String input = struct.getString(field.getName());
-      if (null != input) {
-        if (this.pattern.matcher(input).matches()) {
-          return null;
+    R filter(R record, Struct struct) {
+        for (Field field : struct.schema().getFields()) {
+            if (!this.fields.contains(field.getName()) || 
field.getSchema().getFieldType() != FieldType.STRING) {
+                continue;
+            }
+            String input = struct.getString(field.getName());
+            if (null != input) {
+                if (this.pattern.matcher(input).matches()) {
+                    return null;
+                }
+            }
         }
-      }
+        return record;
     }
-    return record;
-  }
 
-  /**
-   * filter map
-   * @param record
-   * @param map
-   * @return
-   */
-  R filter(R record, Map map) {
-    for (Object field : map.keySet()) {
-      if (!this.fields.contains(field)) {
-        continue;
-      }
-      Object value = map.get(field);
-      if (value instanceof String) {
-        String input = (String) value;
-        if (this.pattern.matcher(input).matches()) {
-          return null;
+    /**
+     * filter map
+     *
+     * @param record
+     * @param map
+     * @return
+     */
+    R filter(R record, Map map) {
+        for (Object field : map.keySet()) {
+            if (!this.fields.contains(field)) {
+                continue;
+            }
+            Object value = map.get(field);
+            if (value instanceof String) {
+                String input = (String) value;
+                if (this.pattern.matcher(input).matches()) {
+                    return null;
+                }
+            }
         }
-      }
+        return record;
     }
-    return record;
-  }
 
-
-  R filter(R record, final boolean key) {
-    final SchemaAndValue input = key ?
-        new SchemaAndValue(record.getKeySchema(), record.getKey()) :
-        new SchemaAndValue(record.getSchema(), record.getData());
-    final R result;
-    if (input.schema() != null) {
-      if (FieldType.STRUCT == input.schema().getFieldType()) {
-        result = filter(record, (Struct) input.value());
-      } else if (FieldType.MAP == input.schema().getFieldType()) {
-        result = filter(record, (Map) input.value());
-      } else {
-        result = record;
-      }
-    } else if (input.value() instanceof Map) {
-      result = filter(record, (Map) input.value());
-    } else {
-      result = record;
+    R filter(R record, final boolean key) {
+        final SchemaAndValue input = key ?
+            new SchemaAndValue(record.getKeySchema(), record.getKey()) :
+            new SchemaAndValue(record.getSchema(), record.getData());
+        final R result;
+        if (input.schema() != null) {
+            if (FieldType.STRUCT == input.schema().getFieldType()) {
+                result = filter(record, (Struct) input.value());
+            } else if (FieldType.MAP == input.schema().getFieldType()) {
+                result = filter(record, (Map) input.value());
+            } else {
+                result = record;
+            }
+        } else if (input.value() instanceof Map) {
+            result = filter(record, (Map) input.value());
+        } else {
+            result = record;
+        }
+        return result;
     }
-    return result;
-  }
 
-
-  /**
-   * filter key
-   * @param <R>
-   */
-  public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
-    @Override
-    public R doTransform(R r) {
-      return filter(r, true);
+    /**
+     * filter key
+     *
+     * @param <R>
+     */
+    public static class Key<R extends ConnectRecord> extends PatternFilter<R> {
+        @Override
+        public R doTransform(R r) {
+            return filter(r, true);
+        }
     }
-  }
 
-  /**
-   * filter value
-   * @param <R>
-   */
-  public static class Value<R extends ConnectRecord> extends PatternFilter<R> {
-    @Override
-    public R doTransform(R r) {
-      return filter(r, false);
+    /**
+     * filter value
+     *
+     * @param <R>
+     */
+    public static class Value<R extends ConnectRecord> extends 
PatternFilter<R> {
+        @Override
+        public R doTransform(R r) {
+            return filter(r, false);
+        }
     }
-  }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
index 543ddf7d..00367b0c 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternFilterConfig.java
@@ -26,38 +26,37 @@ import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
-
 /**
  * pattern filter config
  */
-public class PatternFilterConfig{
-
-  public static final String PATTERN_CONFIG = "pattern";
-  public static final String PATTERN_DOC = "The regex to test the message 
with. ";
-
-  public static final String FIELD_CONFIG = "fields";
-  public static final String FIELD_DOC = "The fields to transform.";
-
-
-  private final Pattern pattern;
-  private final Set<String> fields;
-  public PatternFilterConfig(KeyValue config){
-    ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
-    String pattern = extendKeyValue.getString(PATTERN_CONFIG);
-    try {
-      this.pattern = Pattern.compile(pattern);
-    } catch (PatternSyntaxException var4) {
-      throw new ConnectException(String.format("Could not compile regex 
'%s'.", pattern));
+public class PatternFilterConfig {
+
+    public static final String PATTERN_CONFIG = "pattern";
+    public static final String PATTERN_DOC = "The regex to test the message 
with. ";
+
+    public static final String FIELD_CONFIG = "fields";
+    public static final String FIELD_DOC = "The fields to transform.";
+
+    private final Pattern pattern;
+    private final Set<String> fields;
+
+    public PatternFilterConfig(KeyValue config) {
+        ExtendKeyValue extendKeyValue = new ExtendKeyValue(config);
+        String pattern = extendKeyValue.getString(PATTERN_CONFIG);
+        try {
+            this.pattern = Pattern.compile(pattern);
+        } catch (PatternSyntaxException var4) {
+            throw new ConnectException(String.format("Could not compile regex 
'%s'.", pattern));
+        }
+        List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
+        this.fields = new HashSet<>(fields);
     }
-    List<String> fields = extendKeyValue.getList(FIELD_CONFIG);
-    this.fields = new HashSet<>(fields);
-  }
 
-  public Pattern pattern() {
-    return this.pattern;
-  }
+    public Pattern pattern() {
+        return this.pattern;
+    }
 
-  public Set<String> fields() {
-    return this.fields;
-  }
+    public Set<String> fields() {
+        return this.fields;
+    }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
index d27e3300..a36cb314 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRename.java
@@ -33,114 +33,116 @@ import java.util.regex.Matcher;
 
 /**
  * pattern rename
+ *
  * @param <R>
  */
 public abstract class PatternRename<R extends ConnectRecord> extends 
BaseTransformation<R> {
-  private static final Logger log = 
LoggerFactory.getLogger(PatternRename.class);
-  PatternRenameConfig config;
-  @Override
-  public void start(KeyValue keyValue) {
-    config = new PatternRenameConfig(keyValue);
-  }
+    private static final Logger log = 
LoggerFactory.getLogger(PatternRename.class);
+    PatternRenameConfig config;
 
-  @Override
-  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
inputStruct) {
-    final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
-    outputSchemaBuilder.name(inputSchema.getName());
-    outputSchemaBuilder.doc(inputSchema.getDoc());
-    if (null != inputSchema.getDefaultValue()) {
-      outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
-    }
-    if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
-      outputSchemaBuilder.parameters(inputSchema.getParameters());
-    }
-    if (inputSchema.isOptional()) {
-      outputSchemaBuilder.optional();
-    }
-    Map<String, String> fieldMappings = new 
HashMap<>(inputSchema.getFields().size());
-    for (final Field inputField : inputSchema.getFields()) {
-      log.trace("process() - Processing field '{}'", inputField.getName());
-      final Matcher fieldMatcher = 
this.config.pattern.matcher(inputField.getName());
-      final String outputFieldName;
-      if (fieldMatcher.find()) {
-        // replace
-        outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
-      } else {
-        outputFieldName = inputField.getName();
-      }
-      log.trace("process() - Mapping field '{}' to '{}'", 
inputField.getName(), outputFieldName);
-      fieldMappings.put(inputField.getName(), outputFieldName);
-      outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
-    }
-    final Schema outputSchema = outputSchemaBuilder.build();
-    final Struct outputStruct = new Struct(outputSchema);
-    for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
-      final String inputField = entry.getKey(), outputField = entry.getValue();
-      log.trace("process() - Copying '{}' to '{}'", inputField, outputField);
-      final Object value = inputStruct.get(inputField);
-      outputStruct.put(outputField, value);
+    @Override
+    public void start(KeyValue keyValue) {
+        config = new PatternRenameConfig(keyValue);
     }
-    return new SchemaAndValue(outputSchema, outputStruct);
-  }
 
-  @Override
-  protected SchemaAndValue processMap(R record, Map<String, Object> input) {
-    final Map<String, Object> outputMap = new LinkedHashMap<>(input.size());
-    for (final String inputFieldName : input.keySet()) {
-      log.trace("process() - Processing field '{}'", inputFieldName);
-      final Matcher fieldMatcher = this.config.pattern.matcher(inputFieldName);
-      final String outputFieldName;
-      // replace
-      if (fieldMatcher.find()) {
-        outputFieldName = fieldMatcher.replaceAll(this.config.replacement);
-      } else {
-        outputFieldName = inputFieldName;
-      }
-      final Object value = input.get(inputFieldName);
-      outputMap.put(outputFieldName, value);
+    @Override
+    protected SchemaAndValue processStruct(R record, Schema inputSchema, 
Struct inputStruct) {
+        final SchemaBuilder outputSchemaBuilder = SchemaBuilder.struct();
+        outputSchemaBuilder.name(inputSchema.getName());
+        outputSchemaBuilder.doc(inputSchema.getDoc());
+        if (null != inputSchema.getDefaultValue()) {
+            outputSchemaBuilder.defaultValue(inputSchema.getDefaultValue());
+        }
+        if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
+            outputSchemaBuilder.parameters(inputSchema.getParameters());
+        }
+        if (inputSchema.isOptional()) {
+            outputSchemaBuilder.optional();
+        }
+        Map<String, String> fieldMappings = new 
HashMap<>(inputSchema.getFields().size());
+        for (final Field inputField : inputSchema.getFields()) {
+            log.trace("process() - Processing field '{}'", 
inputField.getName());
+            final Matcher fieldMatcher = 
this.config.pattern.matcher(inputField.getName());
+            final String outputFieldName;
+            if (fieldMatcher.find()) {
+                // replace
+                outputFieldName = 
fieldMatcher.replaceAll(this.config.replacement);
+            } else {
+                outputFieldName = inputField.getName();
+            }
+            log.trace("process() - Mapping field '{}' to '{}'", 
inputField.getName(), outputFieldName);
+            fieldMappings.put(inputField.getName(), outputFieldName);
+            outputSchemaBuilder.field(outputFieldName, inputField.getSchema());
+        }
+        final Schema outputSchema = outputSchemaBuilder.build();
+        final Struct outputStruct = new Struct(outputSchema);
+        for (Map.Entry<String, String> entry : fieldMappings.entrySet()) {
+            final String inputField = entry.getKey(), outputField = 
entry.getValue();
+            log.trace("process() - Copying '{}' to '{}'", inputField, 
outputField);
+            final Object value = inputStruct.get(inputField);
+            outputStruct.put(outputField, value);
+        }
+        return new SchemaAndValue(outputSchema, outputStruct);
     }
-    return new SchemaAndValue(null, outputMap);
-  }
 
-  /**
-   * transform key
-   */
-  public static class Key extends PatternRename<ConnectRecord> {
     @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              transformed.schema(),
-              transformed.value(),
-              r.getSchema(),
-              r.getData()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    protected SchemaAndValue processMap(R record, Map<String, Object> input) {
+        final Map<String, Object> outputMap = new 
LinkedHashMap<>(input.size());
+        for (final String inputFieldName : input.keySet()) {
+            log.trace("process() - Processing field '{}'", inputFieldName);
+            final Matcher fieldMatcher = 
this.config.pattern.matcher(inputFieldName);
+            final String outputFieldName;
+            // replace
+            if (fieldMatcher.find()) {
+                outputFieldName = 
fieldMatcher.replaceAll(this.config.replacement);
+            } else {
+                outputFieldName = inputFieldName;
+            }
+            final Object value = input.get(inputFieldName);
+            outputMap.put(outputFieldName, value);
+        }
+        return new SchemaAndValue(null, outputMap);
     }
-  }
 
-  /**
-   * transform value
-   */
-  public static class Value extends PatternRename<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              r.getKeySchema(),
-              r.getKey(),
-              transformed.schema(),
-              transformed.value()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    /**
+     * transform key
+     */
+    public static class Key extends PatternRename<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getKeySchema(), 
r.getKey());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                transformed.schema(),
+                transformed.value(),
+                r.getSchema(),
+                r.getData()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
+    }
+
+    /**
+     * transform value
+     */
+    public static class Value extends PatternRename<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            final SchemaAndValue transformed = process(r, r.getSchema(), 
r.getData());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                r.getKeySchema(),
+                r.getKey(),
+                transformed.schema(),
+                transformed.value()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
     }
-  }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
index b7c0e9ef..52ddf0ad 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/PatternRenameConfig.java
@@ -30,53 +30,52 @@ import java.util.regex.Pattern;
  */
 public class PatternRenameConfig {
 
-  public static final String FIELD_PATTERN_CONF = "field.pattern";
-  static final String FIELD_PATTERN_DOC = "";
+    public static final String FIELD_PATTERN_CONF = "field.pattern";
+    static final String FIELD_PATTERN_DOC = "";
 
-  public static final String FIELD_PATTERN_FLAGS_CONF = "field.pattern.flags";
-  static final String FIELD_PATTERN_FLAGS_DOC = "";
+    public static final String FIELD_PATTERN_FLAGS_CONF = 
"field.pattern.flags";
+    static final String FIELD_PATTERN_FLAGS_DOC = "";
 
-  public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
-  static final String FIELD_REPLACEMENT_DOC = "";
+    public static final String FIELD_REPLACEMENT_CONF = "field.replacement";
+    static final String FIELD_REPLACEMENT_DOC = "";
 
-  static final Map<String, Integer> FLAG_VALUES;
+    static final Map<String, Integer> FLAG_VALUES;
 
-  static {
-    Map<String, Integer> map = new HashMap<>();
-    map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
-    map.put("CANON_EQ", Pattern.CANON_EQ);
-    map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
-    map.put("DOTALL", Pattern.DOTALL);
-    map.put("LITERAL", Pattern.LITERAL);
-    map.put("MULTILINE", Pattern.MULTILINE);
-    map.put("COMMENTS", Pattern.COMMENTS);
-    map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
-    map.put("UNIX_LINES", Pattern.UNIX_LINES);
-    FLAG_VALUES = ImmutableMap.copyOf(map);
-  }
+    static {
+        Map<String, Integer> map = new HashMap<>();
+        map.put("UNICODE_CHARACTER_CLASS", Pattern.UNICODE_CHARACTER_CLASS);
+        map.put("CANON_EQ", Pattern.CANON_EQ);
+        map.put("UNICODE_CASE", Pattern.UNICODE_CASE);
+        map.put("DOTALL", Pattern.DOTALL);
+        map.put("LITERAL", Pattern.LITERAL);
+        map.put("MULTILINE", Pattern.MULTILINE);
+        map.put("COMMENTS", Pattern.COMMENTS);
+        map.put("CASE_INSENSITIVE", Pattern.CASE_INSENSITIVE);
+        map.put("UNIX_LINES", Pattern.UNIX_LINES);
+        FLAG_VALUES = ImmutableMap.copyOf(map);
+    }
 
-  public final Pattern pattern;
-  public final String replacement;
+    public final Pattern pattern;
+    public final String replacement;
 
-  public PatternRenameConfig(KeyValue config) {
-    ExtendKeyValue extendConfig = new ExtendKeyValue(config);
-    final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
-    final List<String> flagList = 
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
-    int patternFlags = 0;
-    for (final String f : flagList) {
-      final int flag = FLAG_VALUES.get(f);
-      patternFlags = patternFlags | flag;
+    public PatternRenameConfig(KeyValue config) {
+        ExtendKeyValue extendConfig = new ExtendKeyValue(config);
+        final String pattern = extendConfig.getString(FIELD_PATTERN_CONF);
+        final List<String> flagList = 
extendConfig.getList(FIELD_PATTERN_FLAGS_CONF);
+        int patternFlags = 0;
+        for (final String f : flagList) {
+            final int flag = FLAG_VALUES.get(f);
+            patternFlags = patternFlags | flag;
+        }
+        this.pattern = Pattern.compile(pattern, patternFlags);
+        this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
     }
-    this.pattern = Pattern.compile(pattern, patternFlags);
-    this.replacement = config.getString(FIELD_REPLACEMENT_CONF);
-  }
-
 
-  public Pattern pattern() {
-    return pattern;
-  }
+    public Pattern pattern() {
+        return pattern;
+    }
 
-  public String replacement() {
-    return replacement;
-  }
+    public String replacement() {
+        return replacement;
+    }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
index ff4b94fb..c2020250 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/RegexRouter.java
@@ -27,14 +27,15 @@ import java.util.regex.Pattern;
 
 /**
  * regex router
+ *
  * @param <R>
  */
 public abstract class RegexRouter<R extends ConnectRecord> extends 
BaseTransformation<R> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RegexRouter.class);
     public static final String TOPIC = "topic";
     public static final String OVERVIEW_DOC = "Update the record topic using 
the configured regular expression and replacement string."
-            + "<p/>Under the hood, the regex is compiled to a 
<code>java.util.regex.Pattern</code>. "
-            + "If the pattern matches the input topic, 
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the 
replacement string to obtain the new topic.";
+        + "<p/>Under the hood, the regex is compiled to a 
<code>java.util.regex.Pattern</code>. "
+        + "If the pattern matches the input topic, 
<code>java.util.regex.Matcher#replaceFirst()</code> is used with the 
replacement string to obtain the new topic.";
 
     private interface ConfigName {
         String REGEX = "regex";
@@ -52,7 +53,7 @@ public abstract class RegexRouter<R extends ConnectRecord> 
extends BaseTransform
 
     @Override
     public R doTransform(R record) {
-        Map<String, Object>  partitionMap = (Map<String, 
Object>)record.getPosition().getPartition().getPartition();
+        Map<String, Object> partitionMap = (Map<String, Object>) 
record.getPosition().getPartition().getPartition();
         if (null == partitionMap || !partitionMap.containsKey(TOPIC)) {
             LOG.warn("PartitionMap get topic is null , lack of topic config");
             return record;
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
index 3ba40c6f..d31ce913 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecision.java
@@ -36,151 +36,152 @@ import java.util.stream.Collectors;
 
 /**
  * set maximum precision
+ *
  * @param <R>
  */
 public abstract class SetMaximumPrecision<R extends ConnectRecord> extends 
BaseTransformation<R> {
-  private static final Logger log = 
LoggerFactory.getLogger(SetMaximumPrecision.class);
+    private static final Logger log = 
LoggerFactory.getLogger(SetMaximumPrecision.class);
 
-  SetMaximumPrecisionConfig config;
+    SetMaximumPrecisionConfig config;
 
-  @Override
-  public void start(KeyValue keyValue) {
-    config = new SetMaximumPrecisionConfig(keyValue);
-  }
+    @Override
+    public void start(KeyValue keyValue) {
+        config = new SetMaximumPrecisionConfig(keyValue);
+    }
 
-  static final State NOOP = new State(true, null, null);
+    static final State NOOP = new State(true, null, null);
 
-  static class State {
-    public final boolean noop;
-    public final Schema outputSchema;
-    public final Set<String> decimalFields;
+    static class State {
+        public final boolean noop;
+        public final Schema outputSchema;
+        public final Set<String> decimalFields;
 
-    State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
-      this.noop = noop;
-      this.outputSchema = outputSchema;
-      this.decimalFields = decimalFields;
+        State(boolean noop, Schema outputSchema, Set<String> decimalFields) {
+            this.noop = noop;
+            this.outputSchema = outputSchema;
+            this.decimalFields = decimalFields;
+        }
     }
-  }
 
-  Map<Schema, State> schemaLookup = new HashMap<>();
+    Map<Schema, State> schemaLookup = new HashMap<>();
+
+    public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = 
"connect.decimal.precision";
+
+    State state(Schema inputSchema) {
+        return this.schemaLookup.computeIfAbsent(inputSchema, new 
Function<Schema, State>() {
+            @Override
+            public State apply(Schema schema) {
+                Set<String> decimalFields = inputSchema.getFields().stream()
+                    .filter(f -> 
Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
+                    .filter(f -> 
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
 "64")) > config.maxPrecision())
+                    .map(Field::getName)
+                    .collect(Collectors.toSet());
+                State result;
+
+                if (decimalFields.size() == 0) {
+                    result = NOOP;
+                } else {
+                    log.trace("state() - processing schema '{}'", 
schema.getName());
+                    SchemaBuilder builder = SchemaBuilder.struct()
+                        .name(inputSchema.getName())
+                        .doc(inputSchema.getDoc())
+                        .version(inputSchema.getVersion());
+                    if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
+                        builder.parameters(inputSchema.getParameters());
+                    }
+
+                    for (Field field : inputSchema.getFields()) {
+                        log.trace("state() - processing field '{}'", 
field.getName());
+                        if (decimalFields.contains(field.getName())) {
+                            Map<String, String> parameters = new 
LinkedHashMap<>();
+                            if (null != field.getSchema().getParameters() && 
!field.getSchema().getParameters().isEmpty()) {
+                                
parameters.putAll(field.getSchema().getParameters());
+                            }
+                            
parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP, 
Integer.toString(config.maxPrecision()));
+                            int scale = 
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
+                            SchemaBuilder fieldBuilder = Decimal.builder(scale)
+                                .parameters(parameters)
+                                .doc(field.getSchema().getDoc())
+                                .version(field.getSchema().getVersion());
+                            if (field.getSchema().isOptional()) {
+                                fieldBuilder.optional();
+                            }
+                            Schema fieldSchema = fieldBuilder.build();
+                            builder.field(field.getName(), fieldSchema);
+                        } else {
+                            log.trace("state() - copying field '{}' to new 
schema.", field.getName());
+                            builder.field(field.getName(), field.getSchema());
+                        }
+                    }
+
+                    Schema outputSchema = builder.build();
+                    result = new State(false, outputSchema, decimalFields);
+                }
+
+                return result;
+            }
+        });
 
-  public static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = 
"connect.decimal.precision";
+    }
 
-  State state(Schema inputSchema) {
-    return this.schemaLookup.computeIfAbsent(inputSchema, new Function<Schema, 
State>() {
-      @Override
-      public State apply(Schema schema) {
-        Set<String> decimalFields = inputSchema.getFields().stream()
-            .filter(f -> Decimal.LOGICAL_NAME.equals(f.getSchema().getName()))
-            .filter(f -> 
Integer.parseInt(f.getSchema().getParameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP,
 "64")) > config.maxPrecision())
-            .map(Field::getName)
-            .collect(Collectors.toSet());
-        State result;
+    @Override
+    protected SchemaAndValue processStruct(R record, Schema inputSchema, 
Struct input) {
+        State state = state(inputSchema);
+        SchemaAndValue result;
 
-        if (decimalFields.size() == 0) {
-          result = NOOP;
+        if (state.noop) {
+            result = new SchemaAndValue(inputSchema, input);
         } else {
-          log.trace("state() - processing schema '{}'", schema.getName());
-          SchemaBuilder builder = SchemaBuilder.struct()
-              .name(inputSchema.getName())
-              .doc(inputSchema.getDoc())
-              .version(inputSchema.getVersion());
-          if (null != inputSchema.getParameters() && 
!inputSchema.getParameters().isEmpty()) {
-            builder.parameters(inputSchema.getParameters());
-          }
-
-          for (Field field : inputSchema.getFields()) {
-            log.trace("state() - processing field '{}'", field.getName());
-            if (decimalFields.contains(field.getName())) {
-              Map<String, String> parameters = new LinkedHashMap<>();
-              if (null != field.getSchema().getParameters() && 
!field.getSchema().getParameters().isEmpty()) {
-                parameters.putAll(field.getSchema().getParameters());
-              }
-              parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP, 
Integer.toString(config.maxPrecision()));
-              int scale = 
Integer.parseInt(parameters.get(Decimal.SCALE_FIELD));
-              SchemaBuilder fieldBuilder = Decimal.builder(scale)
-                  .parameters(parameters)
-                  .doc(field.getSchema().getDoc())
-                  .version(field.getSchema().getVersion());
-              if (field.getSchema().isOptional()) {
-                fieldBuilder.optional();
-              }
-              Schema fieldSchema = fieldBuilder.build();
-              builder.field(field.getName(), fieldSchema);
-            } else {
-              log.trace("state() - copying field '{}' to new schema.", 
field.getName());
-              builder.field(field.getName(), field.getSchema());
+            Struct struct = new Struct(state.outputSchema);
+            for (Field field : inputSchema.getFields()) {
+                struct.put(field.getName(), input.get(field.getName()));
             }
-          }
-
-          Schema outputSchema = builder.build();
-          result = new State(false, outputSchema, decimalFields);
+            result = new SchemaAndValue(state.outputSchema, struct);
         }
-
         return result;
-      }
-    });
-
-  }
-
-  @Override
-  protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct 
input) {
-    State state = state(inputSchema);
-    SchemaAndValue result;
-
-    if (state.noop) {
-      result = new SchemaAndValue(inputSchema, input);
-    } else {
-      Struct struct = new Struct(state.outputSchema);
-      for (Field field : inputSchema.getFields()) {
-        struct.put(field.getName(), input.get(field.getName()));
-      }
-      result = new SchemaAndValue(state.outputSchema, struct);
     }
-    return result;
-  }
 
-  /**
-   * transform key
-   */
-  public static class Key extends SetMaximumPrecision<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      SchemaAndValue transformed = this.process(r, r.getKeySchema(), 
r.getKey());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              transformed.schema(),
-              transformed.value(),
-              r.getSchema(),
-              r.getData()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    /**
+     * transform key
+     */
+    public static class Key extends SetMaximumPrecision<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            SchemaAndValue transformed = this.process(r, r.getKeySchema(), 
r.getKey());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                transformed.schema(),
+                transformed.value(),
+                r.getSchema(),
+                r.getData()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
     }
-  }
 
-  /**
-   * transform value
-   */
-  public static class Value extends SetMaximumPrecision<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      SchemaAndValue transformed = this.process(r, r.getSchema(), r.getData());
-      ConnectRecord record = new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              r.getKeySchema(),
-              r.getKey(),
-              transformed.schema(),
-              transformed.value()
-      );
-      record.setExtensions(r.getExtensions());
-      return record;
+    /**
+     * transform value
+     */
+    public static class Value extends SetMaximumPrecision<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            SchemaAndValue transformed = this.process(r, r.getSchema(), 
r.getData());
+            ConnectRecord record = new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                r.getKeySchema(),
+                r.getKey(),
+                transformed.schema(),
+                transformed.value()
+            );
+            record.setExtensions(r.getExtensions());
+            return record;
+        }
     }
-  }
 }
 
 
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
index 9a801c39..f8902283 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetMaximumPrecisionConfig.java
@@ -22,16 +22,16 @@ import io.openmessaging.KeyValue;
  * set maximum precision config
  */
 public class SetMaximumPrecisionConfig {
-  public static final String MAX_PRECISION_CONFIG = "precision.max";
-  static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
+    public static final String MAX_PRECISION_CONFIG = "precision.max";
+    static final String MAX_PRECISION_DOC = "The maximum precision allowed.";
 
-  private final int maxPrecision;
+    private final int maxPrecision;
 
-  public SetMaximumPrecisionConfig(KeyValue config) {
-    this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
-  }
+    public SetMaximumPrecisionConfig(KeyValue config) {
+        this.maxPrecision = config.getInt(MAX_PRECISION_CONFIG);
+    }
 
-  public int maxPrecision(){
-    return maxPrecision;
-  }
+    public int maxPrecision() {
+        return maxPrecision;
+    }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
index 54ed09d4..aad03b95 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/SetNull.java
@@ -23,50 +23,49 @@ import org.slf4j.LoggerFactory;
 
 /**
  * set null
+ *
  * @param <R>
  */
 public abstract class SetNull<R extends ConnectRecord> extends 
BaseTransformation<R> {
-  private static final Logger log = LoggerFactory.getLogger(SetNull.class);
-
-
-  PatternRenameConfig config;
+    private static final Logger log = LoggerFactory.getLogger(SetNull.class);
 
-  @Override
-  public void start(KeyValue keyValue) {
-    config = new PatternRenameConfig(keyValue);
-  }
+    PatternRenameConfig config;
 
-  /**
-   * transform key
-   */
-  public static class Key extends SetNull<ConnectRecord> {
     @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      return new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              null,
-              null,
-              r.getSchema(),
-              r.getData()
-      );
+    public void start(KeyValue keyValue) {
+        config = new PatternRenameConfig(keyValue);
     }
-  }
 
+    /**
+     * transform key
+     */
+    public static class Key extends SetNull<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            return new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                null,
+                null,
+                r.getSchema(),
+                r.getData()
+            );
+        }
+    }
 
-  public static class Value extends SetNull<ConnectRecord> {
-    @Override
-    public ConnectRecord doTransform(ConnectRecord r) {
-      return new ConnectRecord(
-              r.getPosition().getPartition(),
-              r.getPosition().getOffset(),
-              r.getTimestamp(),
-              r.getKeySchema(),
-              r.getKey(),
-              r.getSchema(),
-              r.getData()
-      );
+    public static class Value extends SetNull<ConnectRecord> {
+        @Override
+        public ConnectRecord doTransform(ConnectRecord r) {
+            return new ConnectRecord(
+                r.getPosition().getPartition(),
+                r.getPosition().getOffset(),
+                r.getTimestamp(),
+                r.getKeySchema(),
+                r.getKey(),
+                r.getSchema(),
+                r.getData()
+            );
+        }
     }
-  }
 }
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
index 9297718f..48ae4ef4 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/ExtendKeyValue.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.transforms.util;
 
 import io.openmessaging.KeyValue;
@@ -14,9 +31,9 @@ import java.util.regex.Pattern;
 public class ExtendKeyValue implements KeyValue {
     private static final Pattern COMMA_WITH_WHITESPACE = 
Pattern.compile("\\s*,\\s*");
 
-
     private KeyValue config;
-    public ExtendKeyValue(KeyValue config){
+
+    public ExtendKeyValue(KeyValue config) {
         this.config = config;
     }
 
@@ -92,11 +109,12 @@ public class ExtendKeyValue implements KeyValue {
 
     /**
      * get list
+     *
      * @param s
      * @return
      */
-    public List getList(String s){
-        if (!this.config.containsKey(s)){
+    public List getList(String s) {
+        if (!this.config.containsKey(s)) {
             return new ArrayList();
         }
         String config = this.config.getString(s).trim();
@@ -105,6 +123,7 @@ public class ExtendKeyValue implements KeyValue {
 
     /**
      * get list by class
+     *
      * @param s
      * @param clazz
      * @param <T>
@@ -113,7 +132,7 @@ public class ExtendKeyValue implements KeyValue {
     public <T> List<T> getList(String s, Class<T> clazz) {
         List configs = getList(s);
         List<T> castConfigs = new ArrayList<>();
-        configs.forEach(config ->{
+        configs.forEach(config -> {
             castConfigs.add(clazz.cast(config));
         });
         return castConfigs;
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
index 57c38c95..6a88b0c9 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaHelper.java
@@ -52,7 +52,7 @@ public class SchemaHelper {
             if (!(input instanceof BigDecimal)) {
                 throw new 
UnsupportedOperationException(String.format("Unsupported Type: %s", 
input.getClass()));
             }
-            builder = Decimal.builder(((BigDecimal)input).scale());
+            builder = Decimal.builder(((BigDecimal) input).scale());
         }
 
         return builder.optional();
diff --git 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
index 2317a095..c53a7346 100644
--- 
a/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
+++ 
b/transforms/src/main/java/org/apache/rocketmq/connect/transforms/util/SchemaUtil.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.transforms.util;
 
 import io.openmessaging.connector.api.data.Schema;
@@ -7,23 +24,23 @@ import io.openmessaging.connector.api.data.SchemaBuilder;
  * schema util
  */
 public class SchemaUtil {
-    public static Schema INT8_SCHEMA = SchemaBuilder.int8().build();
-    public static Schema INT16_SCHEMA = SchemaBuilder.int16().build();
-    public static Schema INT32_SCHEMA = SchemaBuilder.int32().build();
-    public static Schema INT64_SCHEMA = SchemaBuilder.int64().build();
-    public static Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
-    public static Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
-    public static Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
-    public static Schema STRING_SCHEMA = SchemaBuilder.string().build();
-    public static Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+    public static final Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+    public static final Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+    public static final Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+    public static final Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+    public static final Schema FLOAT32_SCHEMA = 
SchemaBuilder.float32().build();
+    public static final Schema FLOAT64_SCHEMA = 
SchemaBuilder.float64().build();
+    public static final Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+    public static final Schema STRING_SCHEMA = SchemaBuilder.string().build();
+    public static final Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
 
-    public static Schema OPTIONAL_INT8_SCHEMA = 
SchemaBuilder.int8().optional().build();
-    public static Schema OPTIONAL_INT16_SCHEMA = 
SchemaBuilder.int16().optional().build();
-    public static Schema OPTIONAL_INT32_SCHEMA = 
SchemaBuilder.int32().optional().build();
-    public static Schema OPTIONAL_INT64_SCHEMA = 
SchemaBuilder.int64().optional().build();
-    public static Schema OPTIONAL_FLOAT32_SCHEMA = 
SchemaBuilder.float32().optional().build();
-    public static Schema OPTIONAL_FLOAT64_SCHEMA = 
SchemaBuilder.float64().optional().build();
-    public static Schema OPTIONAL_BOOLEAN_SCHEMA = 
SchemaBuilder.bool().optional().build();
-    public static Schema OPTIONAL_STRING_SCHEMA = 
SchemaBuilder.string().optional().build();
-    public static Schema OPTIONAL_BYTES_SCHEMA = 
SchemaBuilder.bytes().optional().build();
+    public static final Schema OPTIONAL_INT8_SCHEMA = 
SchemaBuilder.int8().optional().build();
+    public static final Schema OPTIONAL_INT16_SCHEMA = 
SchemaBuilder.int16().optional().build();
+    public static final Schema OPTIONAL_INT32_SCHEMA = 
SchemaBuilder.int32().optional().build();
+    public static final Schema OPTIONAL_INT64_SCHEMA = 
SchemaBuilder.int64().optional().build();
+    public static final Schema OPTIONAL_FLOAT32_SCHEMA = 
SchemaBuilder.float32().optional().build();
+    public static final Schema OPTIONAL_FLOAT64_SCHEMA = 
SchemaBuilder.float64().optional().build();
+    public static final Schema OPTIONAL_BOOLEAN_SCHEMA = 
SchemaBuilder.bool().optional().build();
+    public static final Schema OPTIONAL_STRING_SCHEMA = 
SchemaBuilder.string().optional().build();
+    public static final Schema OPTIONAL_BYTES_SCHEMA = 
SchemaBuilder.bytes().optional().build();
 }

Reply via email to