This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bdacde4 Format code to pass check-style and make all tests can pass 
(#309)
7bdacde4 is described below

commit 7bdacde4af7c84870e5961cc625159f07bb3396d
Author: rongtong <[email protected]>
AuthorDate: Sun Sep 11 11:22:03 2022 +0800

    Format code to pass check-style and make all tests can pass (#309)
---
 .github/workflows/maven.yml                        |   2 +-
 .../connect/runtime/common/ConnectKeyValue.java    |   9 +-
 .../converter/record/json/JsonConverter.java       |  33 ++----
 .../runtime/serialization/JsonDeserializer.java    |   7 +-
 .../runtime/serialization/ListSerializer.java      |   3 -
 .../runtime/serialization/WrapperSerde.java        |  17 +++
 .../serialization/store/ConnectKeyValueSerde.java  |   4 +-
 .../store/ConnectKeyValueSerializer.java           |   3 -
 .../store/RecordOffsetDeserializer.java            |   1 -
 .../store/RecordPositionMapSerde.java              |   6 +-
 .../service/AbstractConfigManagementService.java   |   7 +-
 .../service/ConfigManagementServiceImpl.java       | 126 ++++++++++-----------
 .../service/PositionManagementServiceImpl.java     |  29 ++---
 .../service/StateManagementServiceImpl.java        |  71 ++++++------
 .../runtime/store/FileBaseKeyValueStore.java       |   2 +-
 .../runtime/utils/datasync/BrokerBasedLog.java     |  38 ++++---
 .../runtime/connectorwrapper/WorkerTest.java       |   2 +-
 17 files changed, 172 insertions(+), 188 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 5672ed5d..46cb15d6 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -24,4 +24,4 @@ jobs:
           distribution: 'temurin'
           cache: maven
       - name: Build with Maven
-        run: mvn -B package --file pom.xml
\ No newline at end of file
+        run: mvn -B clean install
\ No newline at end of file
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index 138495e4..f5612d37 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -137,7 +137,6 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
         this.properties = properties;
     }
 
-
     /**
      * Gets all original settings with the given prefix.
      */
@@ -147,6 +146,7 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
 
     /**
      * Gets all original settings with the given prefix.
+     *
      * @param prefix the prefix to use as a filter
      * @param strip strip the prefix before adding to the output if set true
      * @return a Map containing the settings with the prefix
@@ -181,7 +181,7 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
         this.epoch = epoch;
     }
 
-    public ConnectKeyValue nextGeneration(){
+    public ConnectKeyValue nextGeneration() {
         this.setEpoch(System.currentTimeMillis());
         return this;
     }
@@ -200,7 +200,6 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
         return properties.hashCode();
     }
 
-
     @Override
     public Object clone() {
         try {
@@ -210,7 +209,6 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
         }
     }
 
-
     @Override
     public String toString() {
         return "ConnectKeyValue{" +
@@ -218,7 +216,4 @@ public class ConnectKeyValue implements KeyValue, 
Serializable, Cloneable {
             '}';
     }
 
-
-
-
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
index 7667bc8b..5094d405 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
@@ -53,14 +53,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
  * json converter for fastjson
  */
 public class JsonConverter implements RecordConverter {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-
     private static final Map<FieldType, JsonToConnectTypeConverter> 
TO_CONNECT_CONVERTERS = new EnumMap<>(FieldType.class);
 
     {
@@ -161,8 +159,8 @@ public class JsonConverter implements RecordConverter {
                             throw new ConnectException("Found invalid map 
entry, expected length 2 but found :" + entryArray.toArray().length);
                         }
                         result.put(
-                                convertToConnect(keySchema, 
entryArray.toArray()[0]),
-                                convertToConnect(valueSchema, 
entryArray.toArray()[1]));
+                            convertToConnect(keySchema, 
entryArray.toArray()[0]),
+                            convertToConnect(valueSchema, 
entryArray.toArray()[1]));
                     }
                 }
                 return result;
@@ -281,7 +279,6 @@ public class JsonConverter implements RecordConverter {
         });
     }
 
-
     private JsonDeserializer deserializer = new JsonDeserializer();
     private JsonSerializer serializer = new JsonSerializer();
     public JsonConverterConfig converterConfig;
@@ -303,9 +300,9 @@ public class JsonConverter implements RecordConverter {
     /**
      * Convert a rocketmq Connect data object to a native object for 
serialization.
      *
-     * @param topic  the topic associated with the data
+     * @param topic the topic associated with the data
      * @param schema the schema for the value
-     * @param value  the value to convert
+     * @param value the value to convert
      * @return the serialized value
      */
     @Override
@@ -354,12 +351,11 @@ public class JsonConverter implements RecordConverter {
         Object jsonSchema = 
newJsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME);
         Schema schema = asConnectSchema(jsonSchema == null ? null : 
(JSONObject) jsonSchema);
         return new SchemaAndValue(
-                schema,
-                convertToConnect(schema, 
newJsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
+            schema,
+            convertToConnect(schema, 
newJsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
         );
     }
 
-
     /**
      * convert to json with envelope
      *
@@ -369,8 +365,8 @@ public class JsonConverter implements RecordConverter {
      */
     private JSONObject convertToJsonWithEnvelope(Schema schema, Object value) {
         return new JsonSchema.Envelope(
-                asJsonSchema(schema),
-                convertToJson(schema, value)
+            asJsonSchema(schema),
+            convertToJson(schema, value)
         ).toJsonNode();
     }
 
@@ -385,7 +381,6 @@ public class JsonConverter implements RecordConverter {
         return convertToJson(schema, value);
     }
 
-
     private interface JsonToConnectTypeConverter<Output> {
         Output convert(Schema schema, Object value);
     }
@@ -396,7 +391,6 @@ public class JsonConverter implements RecordConverter {
         Object toConnect(Schema schema, Object value);
     }
 
-
     /**
      * convert ConnectRecord schema to json schema
      *
@@ -506,7 +500,6 @@ public class JsonConverter implements RecordConverter {
         return jsonSchema;
     }
 
-
     /**
      * Convert this object, in the org.apache.kafka.connect.data format, into 
a JSON object, returning both the schema
      * and the converted object.
@@ -591,22 +584,22 @@ public class JsonConverter implements RecordConverter {
                     }
 
                     JSONArray resultArray = new JSONArray();
-                    Map<String , Object> resultMap = new HashMap<>();
+                    Map<String, Object> resultMap = new HashMap<>();
                     for (Map.Entry<?, ?> entry : map.entrySet()) {
                         Schema keySchema = schema == null ? null : 
schema.getKeySchema();
                         Schema valueSchema = schema == null ? null : 
schema.getValueSchema();
                         Object mapKey = convertToJson(keySchema, 
entry.getKey());
                         Object mapValue = convertToJson(valueSchema, 
entry.getValue());
-                        if (objectMode){
+                        if (objectMode) {
                             resultMap.put((String) mapKey, mapValue);
-                        }else {
+                        } else {
                             JSONArray entryArray = new JSONArray();
                             entryArray.add(0, mapKey);
                             entryArray.add(1, mapValue);
                             resultArray.add(entryArray);
                         }
                     }
-                    return objectMode? resultMap: resultArray;
+                    return objectMode ? resultMap : resultArray;
                 }
                 case STRUCT: {
                     Struct struct = (Struct) value;
@@ -627,7 +620,6 @@ public class JsonConverter implements RecordConverter {
         }
     }
 
-
     /**
      * convert json to schema if not empty
      *
@@ -713,7 +705,6 @@ public class JsonConverter implements RecordConverter {
                 throw new ConnectException("Unknown schema type: " + 
schemaType);
         }
 
-
         // optional
         Boolean isOptional = 
jsonSchema.getBoolean(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
         if (isOptional != null && isOptional) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
index 5fb0979b..cc46a494 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/JsonDeserializer.java
@@ -27,8 +27,11 @@ import java.util.Objects;
  */
 public class JsonDeserializer implements Deserializer<Object> {
     private Class aClass;
-    public JsonDeserializer(){}
-    public JsonDeserializer(Class aClass){
+
+    public JsonDeserializer() {
+    }
+
+    public JsonDeserializer(Class aClass) {
         this.aClass = aClass;
     }
 
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
index 5cf909ea..7aec653f 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
@@ -18,13 +18,10 @@
 package org.apache.rocketmq.connect.runtime.serialization;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import io.openmessaging.connector.api.data.Converter;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 /**
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
index f7a2dfd6..8c7b0290 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/WrapperSerde.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.runtime.serialization;
 
 import java.util.Map;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
index 8d75b5fd..ce86c2c5 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerde.java
@@ -14,7 +14,6 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-
 package org.apache.rocketmq.connect.runtime.serialization.store;
 
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
@@ -33,9 +32,10 @@ public class ConnectKeyValueSerde extends 
WrapperSerde<ConnectKeyValue> {
 
     /**
      * serializer and deserializer
+     *
      * @return
      */
-    public static ConnectKeyValueSerde serde(){
+    public static ConnectKeyValueSerde serde() {
         return new ConnectKeyValueSerde(new ConnectKeyValueSerializer(), new 
ConnectKeyValueDeserializer());
     }
 
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
index 93fc717d..7042f302 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
@@ -18,15 +18,12 @@
 package org.apache.rocketmq.connect.runtime.serialization.store;
 
 import com.alibaba.fastjson.JSON;
-import io.openmessaging.connector.api.data.Converter;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.serialization.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
-
 /**
  * Converter data between ConnAndTaskConfigs and byte[].
  */
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
index dad03345..ce09f518 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.connect.runtime.serialization.store;
 
 import com.alibaba.fastjson.JSON;
-import io.openmessaging.connector.api.data.Converter;
 import io.openmessaging.connector.api.data.RecordOffset;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
index c8585ed7..b239c2d5 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerde.java
@@ -29,15 +29,17 @@ import java.util.Map;
  * Byte Map to byte[].
  */
 public class RecordPositionMapSerde extends 
WrapperSerde<Map<ExtendRecordPartition, RecordOffset>> {
-    public RecordPositionMapSerde(Serializer<Map<ExtendRecordPartition, 
RecordOffset>> serializer, Deserializer<Map<ExtendRecordPartition, 
RecordOffset>> deserializer) {
+    public RecordPositionMapSerde(Serializer<Map<ExtendRecordPartition, 
RecordOffset>> serializer,
+        Deserializer<Map<ExtendRecordPartition, RecordOffset>> deserializer) {
         super(serializer, deserializer);
     }
 
     /**
      * serializer and deserializer
+     *
      * @return
      */
-    public static RecordPositionMapSerde serde(){
+    public static RecordPositionMapSerde serde() {
         return new RecordPositionMapSerde(new RecordPositionMapSerializer(), 
new RecordPositionMapDeserializer());
     }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index f2325899..cbdcc45a 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -59,7 +59,6 @@ public abstract class AbstractConfigManagementService 
implements ConfigManagemen
      */
     protected KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
 
-
     @Override
     public void recomputeTaskConfigs(String connectorName, ConnectKeyValue 
configs) {
         int maxTask = configs.getInt(ConnectorConfig.MAX_TASK, 
ConnectorConfig.TASKS_MAX_DEFAULT);
@@ -86,11 +85,11 @@ public abstract class AbstractConfigManagementService 
implements ConfigManagemen
             newKeyValue.put(ConnectorConfig.TASK_CLASS, 
connector.taskClass().getName());
 
             // source topic
-            if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)){
+            if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
                 newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME, 
configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME));
             }
             // sink consume topic
-            if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)){
+            if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) {
                 newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES, 
configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));
             }
 
@@ -108,7 +107,6 @@ public abstract class AbstractConfigManagementService 
implements ConfigManagemen
 
     protected abstract void putTaskConfigs(String connectorName, 
List<ConnectKeyValue> configs);
 
-
     @NotNull
     protected Connector loadConnector(ConnectKeyValue configs) {
         String connectorClass = 
configs.getString(ConnectorConfig.CONNECTOR_CLASS);
@@ -118,7 +116,6 @@ public abstract class AbstractConfigManagementService 
implements ConfigManagemen
         return connector;
     }
 
-
     @Override
     public ClusterConfigState snapshot() {
         if (taskKeyValueStore == null && connectorKeyValueStore == null) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 2e85482e..4be6669a 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -67,27 +67,29 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
     public static final String START_SIGNAL = "start-signal";
 
     public static final String TARGET_STATE_PREFIX = "target-state-";
+
     public static String TARGET_STATE_KEY(String connectorName) {
         return TARGET_STATE_PREFIX + connectorName;
     }
 
-
     public static final String CONNECTOR_PREFIX = "connector-";
+
     public static String CONNECTOR_KEY(String connectorName) {
         return CONNECTOR_PREFIX + connectorName;
     }
 
     public static final String TASK_PREFIX = "task-";
+
     public static String TASK_KEY(ConnectorTaskId taskId) {
         return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
     }
 
     public static final String DELETE_CONNECTOR_PREFIX = "delete-";
+
     public static String DELETE_CONNECTOR_KEY(String connectorName) {
         return DELETE_CONNECTOR_PREFIX + connectorName;
     }
 
-
     private static final String FIELD_STATE = "state";
     private static final String FIELD_EPOCH = "epoch";
     private static final String FIELD_PROPS = "properties";
@@ -96,49 +98,48 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
      * start signal
      */
     public static final Schema START_SIGNAL_V0 = SchemaBuilder.struct()
-            .field(START_SIGNAL, SchemaBuilder.string().build())
-            .build();
+        .field(START_SIGNAL, SchemaBuilder.string().build())
+        .build();
 
     /**
      * connector configuration
      */
     public static final Schema CONNECTOR_CONFIGURATION_V0 = 
SchemaBuilder.struct()
-            .field(FIELD_STATE, SchemaBuilder.string().build())
-            .field(FIELD_EPOCH, SchemaBuilder.int64().build())
-            .field(FIELD_PROPS,
-                    SchemaBuilder.map(
-                            SchemaBuilder.string().optional().build(),
-                            SchemaBuilder.string().optional().build()
-                    ).build())
-            .build();
+        .field(FIELD_STATE, SchemaBuilder.string().build())
+        .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+        .field(FIELD_PROPS,
+            SchemaBuilder.map(
+                SchemaBuilder.string().optional().build(),
+                SchemaBuilder.string().optional().build()
+            ).build())
+        .build();
 
     /**
      * delete connector
      */
     public static final Schema CONNECTOR_DELETE_CONFIGURATION_V0 = 
SchemaBuilder.struct()
-            .field(FIELD_EPOCH, SchemaBuilder.int64().build())
-            .build();
+        .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+        .build();
 
     /**
      * task configuration
      */
-    public static final Schema TASK_CONFIGURATION_V0 =SchemaBuilder.struct()
-            .field(FIELD_EPOCH, SchemaBuilder.int64().build())
-            .field(FIELD_PROPS,
-                    SchemaBuilder.map(
-                            SchemaBuilder.string().build(),
-                            SchemaBuilder.string().optional().build()
-                    ).build())
-            .build();
+    public static final Schema TASK_CONFIGURATION_V0 = SchemaBuilder.struct()
+        .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+        .field(FIELD_PROPS,
+            SchemaBuilder.map(
+                SchemaBuilder.string().build(),
+                SchemaBuilder.string().optional().build()
+            ).build())
+        .build();
 
     /**
      * connector state
      */
     public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
-            .field(FIELD_STATE, SchemaBuilder.string().build())
-            .field(FIELD_EPOCH, SchemaBuilder.int64().build())
-            .build();
-
+        .field(FIELD_STATE, SchemaBuilder.string().build())
+        .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+        .build();
 
     /**
      * All listeners to trigger while config change.
@@ -157,7 +158,8 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
     // converter
     public RecordConverter converter;
 
-    public ConfigManagementServiceImpl() {}
+    public ConfigManagementServiceImpl() {
+    }
 
     @Override
     public void initialize(WorkerConfig workerConfig, RecordConverter 
converter, Plugin plugin) {
@@ -169,29 +171,28 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
 
         this.connectorConfigUpdateListener = new HashSet<>();
         this.dataSynchronizer = new BrokerBasedLog<>(workerConfig,
-                this.topic,
-                ConnectUtil.createGroupName(configManagePrefix, 
workerConfig.getWorkerId()),
-                new ConfigChangeCallback(),
-                Serdes.serdeFrom(String.class),
-                Serdes.serdeFrom(byte[].class)
+            this.topic,
+            ConnectUtil.createGroupName(configManagePrefix, 
workerConfig.getWorkerId()),
+            new ConfigChangeCallback(),
+            Serdes.serdeFrom(String.class),
+            Serdes.serdeFrom(byte[].class)
         );
 
         // store connector config
         this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()),
-                new Serdes.StringSerde(),
-                new JsonSerde(ConnectKeyValue.class));
+            
FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()),
+            new Serdes.StringSerde(),
+            new JsonSerde(ConnectKeyValue.class));
 
         // store task config
         this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()),
-                new Serdes.StringSerde(),
-                new ListSerde(ConnectKeyValue.class));
+            
FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()),
+            new Serdes.StringSerde(),
+            new ListSerde(ConnectKeyValue.class));
 
         this.prepare(workerConfig);
     }
 
-
     /**
      * Preparation before startup
      *
@@ -216,7 +217,7 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
 
     private void sendStartSignal() {
         Struct struct = new Struct(START_SIGNAL_V0);
-        struct.put(START_SIGNAL,"start");
+        struct.put(START_SIGNAL, "start");
         dataSynchronizer.send(START_SIGNAL, converter.fromConnectData(topic, 
START_SIGNAL_V0, struct));
     }
 
@@ -233,7 +234,6 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         return connectorKeyValueStore.getKVMap();
     }
 
-
     @Override
     public String putConnectorConfig(String connectorName, ConnectKeyValue 
configs) {
         // check request config
@@ -266,7 +266,7 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
         connectConfig.put(FIELD_STATE, configs.getTargetState().name());
         connectConfig.put(FIELD_EPOCH, configs.getEpoch());
-        connectConfig.put(FIELD_PROPS , configs.getProperties());
+        connectConfig.put(FIELD_PROPS, configs.getProperties());
         byte[] config = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, connectConfig);
         dataSynchronizer.send(CONNECTOR_KEY(connectorName), config);
         return connectorName;
@@ -377,38 +377,37 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         configs.setConnectorConfigs(connectorKeyValueStore.getKVMap());
         connectorKeyValueStore.getKVMap().forEach((connectName, 
connectKeyValue) -> {
             Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0)
-                    .put(FIELD_EPOCH, connectKeyValue.getEpoch())
-                    .put(FIELD_STATE, connectKeyValue.getTargetState().name())
-                    .put(FIELD_PROPS, connectKeyValue.getProperties());
+                .put(FIELD_EPOCH, connectKeyValue.getEpoch())
+                .put(FIELD_STATE, connectKeyValue.getTargetState().name())
+                .put(FIELD_PROPS, connectKeyValue.getProperties());
             byte[] body = converter.fromConnectData(topic, 
CONNECTOR_CONFIGURATION_V0, struct);
             dataSynchronizer.send(CONNECTOR_KEY(connectName), body);
         });
 
         taskKeyValueStore.getKVMap().forEach((connectName, taskConfigs) -> {
-            if (taskConfigs == null || taskConfigs.isEmpty()){
+            if (taskConfigs == null || taskConfigs.isEmpty()) {
                 return;
             }
             taskConfigs.forEach(taskConfig -> {
                 ConnectorTaskId taskId = new ConnectorTaskId(connectName, 
taskConfig.getInt(ConnectorConfig.TASK_ID));
                 Struct struct = new Struct(TASK_CONFIGURATION_V0)
-                        .put(FIELD_EPOCH, System.currentTimeMillis())
-                        .put(FIELD_PROPS, taskConfig.getProperties());
+                    .put(FIELD_EPOCH, System.currentTimeMillis())
+                    .put(FIELD_PROPS, taskConfig.getProperties());
                 byte[] body = converter.fromConnectData(topic, 
TASK_CONFIGURATION_V0, struct);
                 dataSynchronizer.send(TASK_KEY(taskId), body);
             });
         });
     }
 
-
     private class ConfigChangeCallback implements 
DataSynchronizerCallback<String, byte[]> {
         @Override
         public void onCompletion(Throwable error, String key, byte[] value) {
-            if (StringUtils.isEmpty(key)){
+            if (StringUtils.isEmpty(key)) {
                 log.error("Config change message is illegal, key is empty, the 
message will be skipped");
                 return;
             }
             SchemaAndValue schemaAndValue = converter.toConnectData(topic, 
value);
-            if (key.equals(START_SIGNAL)){
+            if (key.equals(START_SIGNAL)) {
                 // send message in full
                 triggerSendMessage();
                 // reblance
@@ -455,7 +454,7 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         // validate
         ConnectKeyValue oldConfig = connectorKeyValueStore.get(connectorName);
         // config update
-        if ((Long)epoch > oldConfig.getEpoch()) {
+        if ((Long) epoch > oldConfig.getEpoch()) {
             // remove
             connectorKeyValueStore.remove(connectorName);
             taskKeyValueStore.remove(connectorName);
@@ -464,7 +463,6 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         }
     }
 
-
     /**
      * process task config record
      *
@@ -502,17 +500,17 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
 
         Struct struct = (Struct) schemaAndValue.value();
         Object targetState = struct.get(FIELD_STATE);
-        if (!(targetState instanceof String)){
+        if (!(targetState instanceof String)) {
             // target state
             log.error("Invalid data for target state for connector '{}': 
'state' field should be a String but is {}",
-                    connectorName, className(targetState));
+                connectorName, className(targetState));
             return;
         }
         Object epoch = struct.get(FIELD_EPOCH);
         if (!(epoch instanceof Long)) {
             // epoch
             log.error("Invalid data for epoch for connector '{}': 'epoch' 
field should be a Long but is {}",
-                    connectorName, className(epoch));
+                connectorName, className(epoch));
             return;
         }
 
@@ -535,31 +533,31 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
      * @return
      */
     private boolean mergeConnectConfig(String connectName, SchemaAndValue 
schemaAndValue) {
-        Struct value = (Struct)schemaAndValue.value();
+        Struct value = (Struct) schemaAndValue.value();
         Object targetState = value.get(FIELD_STATE);
-        if (!(targetState instanceof String)){
+        if (!(targetState instanceof String)) {
             // target state
             log.error("Invalid data for target state for connector '{}': 
'state' field should be a String but is {}",
-                    connectName, className(targetState));
+                connectName, className(targetState));
             return false;
         }
         Object epoch = value.get(FIELD_EPOCH);
         if (!(epoch instanceof Long)) {
             // epoch
             log.error("Invalid data for epoch for connector '{}': 'state' 
field should be a long but is {}",
-                    connectName, className(epoch));
+                connectName, className(epoch));
             return false;
         }
         Object props = value.get(FIELD_PROPS);
-        if (!(props instanceof Map)){
+        if (!(props instanceof Map)) {
             // properties
             log.error("Invalid data for properties for connector '{}': 'state' 
field should be a Map but is {}",
-                    connectName, className(props));
+                connectName, className(props));
             return false;
         }
         // new configs
         ConnectKeyValue newConfig = new ConnectKeyValue();
-        newConfig.setEpoch((Long)epoch);
+        newConfig.setEpoch((Long) epoch);
         newConfig.setTargetState(TargetState.valueOf((String) targetState));
         newConfig.setProperties((Map<String, String>) props);
 
@@ -593,7 +591,6 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         return StagingMode.DISTRIBUTED;
     }
 
-
     private ConnectorTaskId parseTaskId(String key) {
         String[] parts = key.split("-");
         if (parts.length < 3) {
@@ -609,7 +606,6 @@ public class ConfigManagementServiceImpl extends 
AbstractConfigManagementService
         }
     }
 
-
     private String className(Object o) {
         return o != null ? o.getClass().getName() : "null";
     }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 0b704dec..986bbc04 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -96,22 +96,22 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         this.keyConverter.configure(new HashMap<>());
         this.valueConverter.configure(new HashMap<>());
         this.dataSynchronizer = new BrokerBasedLog(
-                workerConfig,
-                this.topic,
-                ConnectUtil.createGroupName(positionManagePrefix, 
workerConfig.getWorkerId()),
-                new PositionChangeCallback(),
-                Serdes.serdeFrom(ByteBuffer.class),
-                Serdes.serdeFrom(ByteBuffer.class)
+            workerConfig,
+            this.topic,
+            ConnectUtil.createGroupName(positionManagePrefix, 
workerConfig.getWorkerId()),
+            new PositionChangeCallback(),
+            Serdes.serdeFrom(ByteBuffer.class),
+            Serdes.serdeFrom(ByteBuffer.class)
         );
 
         this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(workerConfig.getStorePathRootDir()),
-                new RecordPartitionSerde(),
-                new RecordOffsetSerde());
+            new RecordPartitionSerde(),
+            new RecordOffsetSerde());
 
         this.positionUpdateListener = new HashSet<>();
         this.needSyncPartition = new ConcurrentSet<>();
         this.commitStarted = -1;
-        this.config =  workerConfig;
+        this.config = workerConfig;
         this.prepare(workerConfig);
     }
 
@@ -153,7 +153,6 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         positionStore.load();
     }
 
-
     @Override
     public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
         return positionStore.getKVMap();
@@ -196,9 +195,7 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         if (!increment) {
             Set<ExtendRecordPartition> allPartitions = new HashSet<>();
             allPartitions.addAll(positionStore.getKVMap().keySet());
-            allPartitions.forEach((partition) -> {
-                set(PositionChange.POSITION_CHANG_KEY, partition, 
positionStore.get(partition));
-            });
+            allPartitions.forEach(partition -> 
set(PositionChange.POSITION_CHANG_KEY, partition, 
positionStore.get(partition)));
         }
         //Incremental send
         if (increment) {
@@ -207,9 +204,7 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
                 return;
             }
             Set<ExtendRecordPartition> partitionsTmp = new 
HashSet<>(needSyncPartition);
-            partitionsTmp.forEach((partition) -> {
-                set(PositionChange.POSITION_CHANG_KEY, partition, 
positionStore.get(partition));
-            });
+            partitionsTmp.forEach(partition -> 
set(PositionChange.POSITION_CHANG_KEY, partition, 
positionStore.get(partition)));
         }
         // end send offset
         if (increment) {
@@ -279,7 +274,6 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         dataSynchronizer.send(keyBuffer, valueBuffer);
     }
 
-
     private class PositionChangeCallback implements 
DataSynchronizerCallback<ByteBuffer, ByteBuffer> {
 
         @Override
@@ -331,7 +325,6 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         }
     }
 
-
     /**
      * Merge new received position info with local store.
      *
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
index 3914d327..651b2319 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StateManagementServiceImpl.java
@@ -61,8 +61,6 @@ public class StateManagementServiceImpl implements 
StateManagementService {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-
-
     private final String statusManagePrefix = "StatusManage";
 
     public static final String START_SIGNAL = "start-signal";
@@ -74,18 +72,18 @@ public class StateManagementServiceImpl implements 
StateManagementService {
     public static final String WORKER_ID_KEY_NAME = "worker_id";
     public static final String GENERATION_KEY_NAME = "generation";
     private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
-            .field(STATE_KEY_NAME, SchemaBuilder.string().build())
-            .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
-            .field(WORKER_ID_KEY_NAME, SchemaBuilder.string().build())
-            .field(GENERATION_KEY_NAME, SchemaBuilder.int64().build())
-            .build();
+        .field(STATE_KEY_NAME, SchemaBuilder.string().build())
+        .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+        .field(WORKER_ID_KEY_NAME, SchemaBuilder.string().build())
+        .field(GENERATION_KEY_NAME, SchemaBuilder.int64().build())
+        .build();
 
     /**
      * start signal
      */
     public static final Schema START_SIGNAL_V0 = SchemaBuilder.struct()
-            .field(START_SIGNAL, SchemaBuilder.string().build())
-            .build();
+        .field(START_SIGNAL, SchemaBuilder.string().build())
+        .build();
     /**
      * Synchronize config with other workers.
      */
@@ -100,6 +98,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
 
     private RecordConverter converter = new 
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter();
     private String statusTopic;
+
     /**
      * Preparation before startup
      *
@@ -127,23 +126,23 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         this.statusTopic = config.getConnectStatusTopic();
 
         this.dataSynchronizer = new BrokerBasedLog(config,
-                this.statusTopic,
-                ConnectUtil.createGroupName(statusManagePrefix, 
config.getWorkerId()),
-                new StatusChangeCallback(),
-                Serdes.serdeFrom(String.class),
-                Serdes.serdeFrom(byte[].class));
+            this.statusTopic,
+            ConnectUtil.createGroupName(statusManagePrefix, 
config.getWorkerId()),
+            new StatusChangeCallback(),
+            Serdes.serdeFrom(String.class),
+            Serdes.serdeFrom(byte[].class));
 
         /**connector status store*/
         this.connectorStatusStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getConnectorStatusConfigPath(config.getStorePathRootDir()),
-                new Serdes.StringSerde(),
-                new JsonSerde(ConnectorStatus.class));
+            
FilePathConfigUtil.getConnectorStatusConfigPath(config.getStorePathRootDir()),
+            new Serdes.StringSerde(),
+            new JsonSerde(ConnectorStatus.class));
 
         /**task status store*/
         this.taskStatusStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getTaskStatusConfigPath(config.getStorePathRootDir()),
-                new Serdes.StringSerde(),
-                new ListSerde(TaskStatus.class));
+            
FilePathConfigUtil.getTaskStatusConfigPath(config.getStorePathRootDir()),
+            new Serdes.StringSerde(),
+            new ListSerde(TaskStatus.class));
         // create topic
         this.prepare(config);
     }
@@ -161,7 +160,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
 
     private void startSignal() {
         Struct struct = new Struct(START_SIGNAL_V0);
-        struct.put(START_SIGNAL,START_SIGNAL);
+        struct.put(START_SIGNAL, START_SIGNAL);
         dataSynchronizer.send(START_SIGNAL, 
converter.fromConnectData(statusTopic, START_SIGNAL_V0, struct));
     }
 
@@ -206,6 +205,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
             });
         });
     }
+
     /**
      * pre persist
      */
@@ -254,7 +254,6 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         sendConnectorStatus(status, true);
     }
 
-
     /**
      * Set the state of the connector to the given value.
      *
@@ -266,10 +265,9 @@ public class StateManagementServiceImpl implements 
StateManagementService {
     }
 
     /**
-     * Safely set the state of the task to the given value. What is
-     * considered "safe" depends on the implementation, but basically it
-     * means that the store can provide higher assurance that another worker
-     * hasn't concurrently written any conflicting data.
+     * Safely set the state of the task to the given value. What is considered 
"safe" depends on the implementation, but
+     * basically it means that the store can provide higher assurance that 
another worker hasn't concurrently written
+     * any conflicting data.
      *
      * @param status the status of the task
      */
@@ -294,11 +292,10 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         send(key, status, entry, safeWrite);
     }
 
-
     private <V extends AbstractStatus<?>> void send(final String key,
-                                                    final V status,
-                                                    final 
ConnAndTaskStatus.CacheEntry<V> entry,
-                                                    final boolean safeWrite) {
+        final V status,
+        final ConnAndTaskStatus.CacheEntry<V> entry,
+        final boolean safeWrite) {
         synchronized (this) {
             if (safeWrite && !entry.canWrite(status)) {
                 return;
@@ -326,7 +323,6 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         return converter.fromConnectData(this.statusTopic, STATUS_SCHEMA_V0, 
struct);
     }
 
-
     /**
      * Get the current state of the task.
      *
@@ -395,15 +391,14 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         return StagingMode.DISTRIBUTED;
     }
 
-
     private class StatusChangeCallback implements 
DataSynchronizerCallback<String, byte[]> {
         @Override
         public void onCompletion(Throwable error, String key, byte[] value) {
-            if (StringUtils.isEmpty(key)){
+            if (StringUtils.isEmpty(key)) {
                 log.error("State change message is illegal, key is empty, the 
message will be skipped ");
                 return;
             }
-            if (key.equals(START_SIGNAL)){
+            if (key.equals(START_SIGNAL)) {
                 replicaTargetState();
             } else if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
                 readConnectorStatus(key, value);
@@ -437,7 +432,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
             log.trace("Received connector {} status update {}", connector, 
status);
             ConnAndTaskStatus.CacheEntry<ConnectorStatus> entry = 
connAndTaskStatus.getOrAdd(connector);
             if (entry.get() != null) {
-                if (status.getGeneration() > entry.get().getGeneration() ){
+                if (status.getGeneration() > entry.get().getGeneration()) {
                     entry.put(status);
                 }
             } else {
@@ -446,7 +441,6 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         }
     }
 
-
     private String parseConnectorStatusKey(String key) {
         return key.substring(CONNECTOR_STATUS_PREFIX.length());
     }
@@ -463,7 +457,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
             String trace = (String) struct.get(TRACE_KEY_NAME);
             String workerUrl = (String) struct.get(WORKER_ID_KEY_NAME);
             Long generation = (Long) struct.get(GENERATION_KEY_NAME);
-            return new ConnectorStatus(connector, state,  workerUrl, 
generation, trace);
+            return new ConnectorStatus(connector, state, workerUrl, 
generation, trace);
         } catch (Exception e) {
             log.error("Failed to deserialize connector status", e);
             return null;
@@ -493,7 +487,7 @@ public class StateManagementServiceImpl implements 
StateManagementService {
             log.trace("Received task {} status update {}", id, status);
             ConnAndTaskStatus.CacheEntry<TaskStatus> entry = 
connAndTaskStatus.getOrAdd(id);
             if (entry.get() != null) {
-                if (status.getGeneration() > entry.get().getGeneration() ){
+                if (status.getGeneration() > entry.get().getGeneration()) {
                     entry.put(status);
                 }
             } else {
@@ -536,7 +530,6 @@ public class StateManagementServiceImpl implements 
StateManagementService {
         }
     }
 
-
     /**
      * remove connector
      *
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
index e932860b..74937499 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
@@ -54,7 +54,7 @@ public class FileBaseKeyValueStore<K, V> extends 
MemoryBasedKeyValueStore<K, V>
     public String encode() {
         Map<String, String> map = new HashMap<>();
         for (K key : data.keySet()) {
-            byte[] keyByte = serdeKey.serializer().serialize("",key);
+            byte[] keyByte = serdeKey.serializer().serialize("", key);
             byte[] valueByte = serdeValue.serializer().serialize("", 
data.get(key));
             map.put(Base64Util.base64Encode(keyByte), 
Base64Util.base64Encode(valueByte));
         }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index a5932a44..21f5f309 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -77,11 +77,11 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
     private Serde valueSerde;
 
     public BrokerBasedLog(WorkerConfig connectConfig,
-                          String topicName,
-                          String workId,
-                          DataSynchronizerCallback<K, V> 
dataSynchronizerCallback,
-                          Serde keySerde,
-                          Serde valueSerde) {
+        String topicName,
+        String workId,
+        DataSynchronizerCallback<K, V> dataSynchronizerCallback,
+        Serde keySerde,
+        Serde valueSerde) {
 
         this.topicName = topicName;
         this.keySerde = keySerde;
@@ -127,7 +127,7 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
     @Override
     public void send(K key, V value) {
         try {
-            Map.Entry<byte[],byte[]> encode = encode(key, value);
+            Map.Entry<byte[], byte[]> encode = encode(key, value);
             byte[] body = encode.getValue();
             if (body.length > MAX_MESSAGE_SIZE) {
                 log.error("Message size is greater than {} bytes, key: {}, 
value {}", MAX_MESSAGE_SIZE, key, value);
@@ -161,7 +161,7 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
     @Override
     public void send(K key, V value, Callback callback) {
         try {
-            Map.Entry<byte[],byte[]> encode = encode(key, value);
+            Map.Entry<byte[], byte[]> encode = encode(key, value);
             byte[] body = encode.getValue();
             if (body.length > MAX_MESSAGE_SIZE) {
                 log.error("Message size is greater than {} bytes, key: {}, 
value {}", MAX_MESSAGE_SIZE, key, value);
@@ -174,6 +174,7 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
                     log.info("Send async message OK, msgId: {},topic:{}", 
result.getMsgId(), topicName);
                     callback.onCompletion(null, value);
                 }
+
                 @Override public void onException(Throwable throwable) {
                     if (null != throwable) {
                         log.error("Send async message Failed, error: {}", 
throwable);
@@ -186,19 +187,20 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
         }
     }
 
-
-    private Map.Entry<byte[],byte[]> encode(K key, V value) {
+    private Map.Entry<byte[], byte[]> encode(K key, V value) {
         byte[] keySer = keySerde.serializer().serialize(topicName, key);
-        byte[] valueSer = valueSerde.serializer().serialize(topicName,value);
-        return new Map.Entry<byte[],byte[]>(){
+        byte[] valueSer = valueSerde.serializer().serialize(topicName, value);
+        return new Map.Entry<byte[], byte[]>() {
             @Override
             public byte[] getKey() {
                 return keySer;
             }
+
             @Override
             public byte[] getValue() {
                 return valueSer;
             }
+
             @Override
             public byte[] setValue(byte[] value) {
                 throw new UnsupportedOperationException();
@@ -207,17 +209,19 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
     }
 
     private Map.Entry<K, V> decode(byte[] key, byte[] value) {
-        K deKey = (K)keySerde.deserializer().deserialize(topicName, key);
-        V deValue = (V)valueSerde.deserializer().deserialize(topicName, value);
-        return new Map.Entry<K, V>(){
+        K deKey = (K) keySerde.deserializer().deserialize(topicName, key);
+        V deValue = (V) valueSerde.deserializer().deserialize(topicName, 
value);
+        return new Map.Entry<K, V>() {
             @Override
             public K getKey() {
                 return deKey;
             }
+
             @Override
             public V getValue() {
                 return deValue;
             }
+
             @Override
             public V setValue(V value) {
                 throw new UnsupportedOperationException();
@@ -225,15 +229,15 @@ public class BrokerBasedLog<K, V> implements 
DataSynchronizer<K, V> {
         };
     }
 
-
     class MessageListenerImpl implements MessageListenerConcurrently {
         @Override
-        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
rmqMsgList, ConsumeConcurrentlyContext context) {
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
rmqMsgList,
+            ConsumeConcurrentlyContext context) {
             for (MessageExt messageExt : rmqMsgList) {
                 log.info("Received one message: {}, topic is {}", 
messageExt.getMsgId() + "\n", topicName);
                 try {
                     String key = messageExt.getKeys();
-                    Map.Entry<K,V> entry = decode(StringUtils.isEmpty(key) ? 
null : Base64Util.base64Decode(key), messageExt.getBody());
+                    Map.Entry<K, V> entry = decode(StringUtils.isEmpty(key) ? 
null : Base64Util.base64Decode(key), messageExt.getBody());
                     dataSynchronizerCallback.onCompletion(null, 
entry.getKey(), entry.getValue());
                 } catch (Exception e) {
                     log.error("Decode message data error. message: {}, error 
info: {}", messageExt, e);
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 78e4f582..e017f565 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -129,7 +129,7 @@ public class WorkerTest {
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + 
File.separator + "testConnectorStore");
         connectConfig.setNamesrvAddr("localhost:9876");
         stateManagementService = new StateManagementServiceImpl();
-        stateManagementService.initialize(connectConfig);
+        stateManagementService.initialize(connectConfig, new TestConverter());
         worker = new Worker(connectConfig, positionManagementService, 
configManagementService, plugin, connectController, stateManagementService);
 
         Set<WorkerConnector> workingConnectors = new HashSet<>();

Reply via email to