This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new fc1b8e1  [ISSUE #120] Standalone dev (#142)
fc1b8e1 is described below

commit fc1b8e1abd9d2d8a215fec664307bada3c50589a
Author: xiaoyi <[email protected]>
AuthorDate: Tue May 24 14:26:38 2022 +0800

    [ISSUE #120] Standalone dev (#142)
    
    * init standalone
    
    * fixed
    
    * complete standalone mode
    
    * test standalone mode
    
    * check code
    
    * fixed
    
    * Ensure the uniqueness of offset keys for different connectors #143
    
    Co-authored-by: “sunxiaojian” <“[email protected]”>
---
 connectors/rocketmq-connect-hudi/README.md         |   2 +-
 .../connect/jdbc/connector/JdbcSourceConfig.java   |   7 +-
 .../jdbc/source/offset/SourceOffsetCompute.java    |   4 +-
 .../{run_worker.sh => connect-distributed.sh}      |   2 +-
 .../{run_worker.sh => connect-standalone.sh}       |   2 +-
 .../connect/runtime/ConnectController.java         | 256 ---------------------
 ...Startup.java => DistributedConnectStartup.java} |  50 +++-
 ...tStartup.java => StandaloneConnectStartup.java} |  64 ++++--
 .../connect/runtime/connectorwrapper/Worker.java   |   9 +-
 .../runtime/connectorwrapper/WorkerDirectTask.java |   8 +-
 .../runtime/connectorwrapper/WorkerSourceTask.java |  14 +-
 .../controller/AbstractConnectController.java      | 176 ++++++++++++++
 .../runtime/controller/ConnectController.java      |  35 +++
 .../controller/distributed/DistributedConfig.java  |  26 +++
 .../distributed/DistributedConnectController.java  | 118 ++++++++++
 .../controller/standalone/StandaloneConfig.java    |  26 +++
 .../standalone/StandaloneConnectController.java    |  65 ++++++
 .../converter/RecordPartitionConverter.java        |  10 +-
 .../converter/RecordPositionMapConverter.java      |  14 +-
 .../rocketmq/connect/runtime/rest/RestHandler.java |   9 +-
 .../runtime/service/ClusterManagementService.java  |  11 +
 .../runtime/service/ConfigManagementService.java   |  13 ++
 .../runtime/service/DefaultConnectorContext.java   |   7 +-
 .../service/OffsetManagementServiceImpl.java       |  57 ++---
 .../runtime/service/PositionManagementService.java |  25 +-
 .../service/PositionManagementServiceImpl.java     |  59 ++---
 .../connect/runtime/service/RebalanceImpl.java     |  18 +-
 .../memory/FileOffsetManagementServiceImpl.java    | 204 ++++++++++++++++
 .../memory/FilePositionManagementServiceImpl.java  | 201 ++++++++++++++++
 .../memory/MemoryClusterManagementServiceImpl.java |  97 ++++++++
 .../memory/MemoryConfigManagementServiceImpl.java  | 254 ++++++++++++++++++++
 .../service/memory/StandaloneRebalanceService.java |  90 ++++++++
 .../DefaultAllocateConnAndTaskStrategy.java        |   2 +-
 .../runtime/store/ExtendRecordPartition.java       |  63 +++++
 .../runtime/store/PositionStorageReaderImpl.java   |  15 +-
 ...eReaderImpl.java => PositionStorageWriter.java} |  31 +--
 .../{connect.conf => connect-distributed.conf}     |   0
 .../{connect.conf => connect-standalone.conf}      |   4 +-
 .../runtime/connectorwrapper/WorkerTest.java       |   4 +-
 .../testimpl/TestPositionManageServiceImpl.java    |  11 +-
 .../connect/runtime/rest/RestHandlerTest.java      |   4 +-
 .../service/PositionManagementServiceImplTest.java |  33 +--
 42 files changed, 1654 insertions(+), 446 deletions(-)

diff --git a/connectors/rocketmq-connect-hudi/README.md 
b/connectors/rocketmq-connect-hudi/README.md
index cadc364..05fdfb9 100644
--- a/connectors/rocketmq-connect-hudi/README.md
+++ b/connectors/rocketmq-connect-hudi/README.md
@@ -72,6 +72,6 @@ 
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
 * **spark-submit 启动任务**
 将connect-runtime打包后通过spark-submit提交任务
 ```
-nohup sh spark-submit  --class 
org.apache.rocketmq.connect.runtime.ConnectStartup --conf 
"spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files 
/xxx/conf/connect.conf,/xxx/conf/log4j.properties  --packages 
org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apa
 [...]
+nohup sh spark-submit  --class 
org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf 
"spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files 
/xxx/conf/connect.conf,/xxx/conf/log4j.properties  --packages 
org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9
 [...]
 ```
 后续操作参考rocketmq-connect-hudi启动步骤
\ No newline at end of file
diff --git 
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
 
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
index 3c34c5e..20f3aa7 100644
--- 
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
+++ 
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
@@ -207,7 +207,7 @@ public class JdbcSourceConfig extends AbstractConfig {
     private String incrementingColumnName;
     private List<String> timestampColumnNames;
     private long timestampDelayIntervalMs;
-    private Long timestampInitial;
+    private Long timestampInitial = TIMESTAMP_INITIAL_DEFAULT;
     private Set<String> tableWhitelist;
     private Set<String> tableBlacklist;
     private String schemaPattern;
@@ -231,7 +231,10 @@ public class JdbcSourceConfig extends AbstractConfig {
         this.incrementingColumnName = 
config.getString(INCREMENTING_COLUMN_NAME_CONFIG);
         this.timestampColumnNames = getList(config, 
TIMESTAMP_COLUMN_NAME_CONFIG);
         timestampDelayIntervalMs = 
config.getLong(TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
-        this.timestampInitial = config.containsKey(TIMESTAMP_INITIAL_CONFIG) ? 
config.getLong(TIMESTAMP_INITIAL_CONFIG) : TIMESTAMP_INITIAL_DEFAULT;
+//        
this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG,TIMESTAMP_INITIAL_DEFAULT);
+        if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)){
+            this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG);
+        }
         this.tableWhitelist = new HashSet<>(getList(config, 
TABLE_WHITELIST_CONFIG));
         this.tableBlacklist = new HashSet<>(getList(config, 
TABLE_BLACKLIST_CONFIG));
         this.schemaPattern = config.getString(SCHEMA_PATTERN_CONFIG);
diff --git 
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
 
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
index 6b5411e..0ae2f2c 100644
--- 
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
+++ 
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
@@ -166,7 +166,7 @@ public class SourceOffsetCompute {
                     throw new ConnectException("Unexpected query mode: " + 
queryMode);
             }
             Map<String, Object> offset = null;
-            if (offsets != null && tablePartitionsToCheck != null) {
+            if (offsets != null && tablePartitionsToCheck != null && 
offsets.containsKey(tablePartitionsToCheck)) {
                 offset = (Map<String, Object>) 
offsets.get(tablePartitionsToCheck).getOffset();
             }
             offset = computeInitialOffset(
@@ -271,7 +271,7 @@ public class SourceOffsetCompute {
                 log.info("No offsets found for '{}', so using configured 
timestamp {}", tableOrQuery,
                         timestampInitial);
             } else {
-                if (queryMode != Querier.QueryMode.TABLE) {
+                if (queryMode != Querier.QueryMode.TABLE || timestampColumns 
== null || timestampColumns.isEmpty()) {
                     return initialPartitionOffset;
                 }
                 try {
diff --git a/rocketmq-connect-runtime/run_worker.sh 
b/rocketmq-connect-runtime/connect-distributed.sh
similarity index 91%
copy from rocketmq-connect-runtime/run_worker.sh
copy to rocketmq-connect-runtime/connect-distributed.sh
index d87a4bf..2901a07 100755
--- a/rocketmq-connect-runtime/run_worker.sh
+++ b/rocketmq-connect-runtime/connect-distributed.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 echo "run rumtime worker"
-cd target/distribution/ && java -cp .:./conf/:./lib/* 
org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
\ No newline at end of file
+cd target/distribution/ && java -cp .:./conf/:./lib/* 
org.apache.rocketmq.connect.runtime.DistributedConnectStartup -c 
conf/connect-distributed.conf
\ No newline at end of file
diff --git a/rocketmq-connect-runtime/run_worker.sh 
b/rocketmq-connect-runtime/connect-standalone.sh
similarity index 91%
rename from rocketmq-connect-runtime/run_worker.sh
rename to rocketmq-connect-runtime/connect-standalone.sh
index d87a4bf..57daac4 100755
--- a/rocketmq-connect-runtime/run_worker.sh
+++ b/rocketmq-connect-runtime/connect-standalone.sh
@@ -14,4 +14,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 echo "run rumtime worker"
-cd target/distribution/ && java -cp .:./conf/:./lib/* 
org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
\ No newline at end of file
+cd target/distribution/ && java -cp .:./conf/:./lib/* 
org.apache.rocketmq.connect.runtime.StandaloneConnectStartup -c 
conf/connect-standalone.conf
\ No newline at end of file
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
deleted file mode 100644
index 1c59287..0000000
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
-import org.apache.rocketmq.connect.runtime.rest.RestHandler;
-import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
-import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import 
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
-import org.apache.rocketmq.connect.runtime.service.RebalanceService;
-import 
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
-import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
-import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Connect controller to access and control all resource in runtime.
- */
-public class ConnectController {
-
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-    /**
-     * Configuration of current runtime.
-     */
-    private final ConnectConfig connectConfig;
-
-    /**
-     * All the configurations of current running connectors and tasks in 
cluster.
-     */
-    private final ConfigManagementService configManagementService;
-
-    /**
-     * Position management of source tasks.
-     */
-    private final PositionManagementService positionManagementService;
-
-    /**
-     * Offset management of sink tasks.
-     */
-    private final PositionManagementService offsetManagementService;
-
-    /**
-     * Manage the online info of the cluster.
-     */
-    private final ClusterManagementService clusterManagementService;
-
-    /**
-     * A worker to schedule all connectors and tasks assigned to current 
process.
-     */
-    private final Worker worker;
-
-    /**
-     * A REST handler, interacting with user.
-     */
-    private final RestHandler restHandler;
-
-    /**
-     * Assign all connectors and tasks to all alive process in the cluster.
-     */
-    private final RebalanceImpl rebalanceImpl;
-
-    /**
-     * A scheduled task to rebalance all connectors and tasks in the cluster.
-     */
-    private final RebalanceService rebalanceService;
-
-    /**
-     * Thread pool to run schedule task.
-     */
-    private ScheduledExecutorService scheduledExecutorService;
-
-    private final Plugin plugin;
-
-    private ConnectStatsManager connectStatsManager;
-
-    private final ConnectStatsService connectStatsService;
-
-    public ConnectController(
-        ConnectConfig connectConfig) {
-
-        List<String> pluginPaths = new ArrayList<>(16);
-        if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
-            String[] strArr = connectConfig.getPluginPaths().split(",");
-            for (String path : strArr) {
-                if (StringUtils.isNotEmpty(path)) {
-                    pluginPaths.add(path);
-                }
-            }
-        }
-        plugin = new Plugin(pluginPaths);
-        plugin.initPlugin();
-        this.connectStatsManager = new ConnectStatsManager(connectConfig);
-        this.connectStatsService = new ConnectStatsService();
-        this.connectConfig = connectConfig;
-        this.clusterManagementService = new 
ClusterManagementServiceImpl(connectConfig);
-        this.configManagementService = new 
ConfigManagementServiceImpl(connectConfig, plugin);
-        this.positionManagementService = new 
PositionManagementServiceImpl(connectConfig);
-        this.offsetManagementService = new 
OffsetManagementServiceImpl(connectConfig);
-        this.worker = new Worker(connectConfig, positionManagementService, 
configManagementService, plugin, this);
-        AllocateConnAndTaskStrategy strategy = 
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
-        this.rebalanceImpl = new RebalanceImpl(worker, 
configManagementService, clusterManagementService, strategy, this);
-        this.restHandler = new RestHandler(this);
-        this.rebalanceService = new RebalanceService(rebalanceImpl, 
configManagementService, clusterManagementService);
-    }
-
-    public void initialize() {
-        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, 
"ConnectScheduledThread"));
-    }
-
-    public void start() {
-
-        clusterManagementService.start();
-        configManagementService.start();
-        positionManagementService.start();
-        offsetManagementService.start();
-        worker.start();
-        rebalanceService.start();
-        connectStatsService.start();
-
-        // Persist configurations of current connectors and tasks.
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
-            try {
-                ConnectController.this.configManagementService.persist();
-            } catch (Exception e) {
-                log.error("schedule persist config error.", e);
-            }
-        }, 1000, this.connectConfig.getConfigPersistInterval(), 
TimeUnit.MILLISECONDS);
-
-        // Persist position information of source tasks.
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
-            try {
-                ConnectController.this.positionManagementService.persist();
-            } catch (Exception e) {
-                log.error("schedule persist position error.", e);
-            }
-        }, 1000, this.connectConfig.getPositionPersistInterval(), 
TimeUnit.MILLISECONDS);
-
-        // Persist offset information of sink tasks.
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
-            try {
-                ConnectController.this.offsetManagementService.persist();
-            } catch (Exception e) {
-                log.error("schedule persist offset error.", e);
-            }
-        }, 1000, this.connectConfig.getOffsetPersistInterval(), 
TimeUnit.MILLISECONDS);
-    }
-
-    public void shutdown() {
-
-        if (worker != null) {
-            worker.stop();
-        }
-
-        if (configManagementService != null) {
-            configManagementService.stop();
-        }
-
-        if (positionManagementService != null) {
-            positionManagementService.stop();
-        }
-
-        if (offsetManagementService != null) {
-            offsetManagementService.stop();
-        }
-
-        if (clusterManagementService != null) {
-            clusterManagementService.stop();
-        }
-
-        this.scheduledExecutorService.shutdown();
-        try {
-            this.scheduledExecutorService.awaitTermination(5000, 
TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            log.error("shutdown scheduledExecutorService error.", e);
-        }
-
-        if (rebalanceService != null) {
-            rebalanceService.stop();
-        }
-    }
-
-    public ConnectConfig getConnectConfig() {
-        return connectConfig;
-    }
-
-    public ConfigManagementService getConfigManagementService() {
-        return configManagementService;
-    }
-
-    public PositionManagementService getPositionManagementService() {
-        return positionManagementService;
-    }
-
-    public ClusterManagementService getClusterManagementService() {
-        return clusterManagementService;
-    }
-
-    public Worker getWorker() {
-        return worker;
-    }
-
-    public RestHandler getRestHandler() {
-        return restHandler;
-    }
-
-    public RebalanceImpl getRebalanceImpl() {
-        return rebalanceImpl;
-    }
-
-    public ConnectStatsManager getConnectStatsManager() {
-        return connectStatsManager;
-    }
-
-    public void setConnectStatsManager(ConnectStatsManager 
connectStatsManager) {
-        this.connectStatsManager = connectStatsManager;
-    }
-
-    public ConnectStatsService getConnectStatsService() {
-        return connectStatsService;
-    }
-}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
similarity index 69%
copy from 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
copy to 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index c31b7b8..dc38a86 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -20,15 +20,25 @@ package org.apache.rocketmq.connect.runtime;
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConfig;
+import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
+import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
+import 
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +46,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Startup class of the runtime worker.
  */
-public class ConnectStartup {
+public class DistributedConnectStartup {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
@@ -50,7 +60,7 @@ public class ConnectStartup {
         start(createConnectController(args));
     }
 
-    private static void start(ConnectController controller) {
+    private static void start(DistributedConnectController controller) {
 
         try {
             controller.start();
@@ -69,37 +79,53 @@ public class ConnectStartup {
      * @param args
      * @return
      */
-    private static ConnectController createConnectController(String[] args) {
+    private static DistributedConnectController 
createConnectController(String[] args) {
 
         try {
 
             // Build the command line options.
             Options options = ServerUtil.buildCommandlineOptions(new 
Options());
             commandLine = ServerUtil.parseCmdLine("connect", args, 
buildCommandlineOptions(options),
-                new PosixParser());
+                    new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
             }
 
             // Load configs from command line.
-            ConnectConfig connectConfig = new ConnectConfig();
+            DistributedConfig connectConfig = new DistributedConfig();
             if (commandLine.hasOption('c')) {
-                String file = commandLine.getOptionValue('c');
+                String file = commandLine.getOptionValue('c').trim();
                 if (file != null) {
                     configFile = file;
                     InputStream in = new BufferedInputStream(new 
FileInputStream(file));
                     properties = new Properties();
                     properties.load(in);
-
                     FileAndPropertyUtil.properties2Object(properties, 
connectConfig);
-
                     in.close();
                 }
             }
 
+            List<String> pluginPaths = new ArrayList<>(16);
+            if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
+                String[] strArr = connectConfig.getPluginPaths().split(",");
+                for (String path : strArr) {
+                    if (StringUtils.isNotEmpty(path)) {
+                        pluginPaths.add(path);
+                    }
+                }
+            }
+            Plugin plugin = new Plugin(pluginPaths);
+            plugin.initPlugin();
+
             // Create controller and initialize.
-            ConnectController controller = new 
ConnectController(connectConfig);
-            controller.initialize();
+
+            DistributedConnectController controller = new 
DistributedConnectController(
+                    plugin,
+                    connectConfig,
+                    new ClusterManagementServiceImpl(connectConfig),
+                    new ConfigManagementServiceImpl(connectConfig, plugin),
+                    new PositionManagementServiceImpl(connectConfig),
+                    new OffsetManagementServiceImpl(connectConfig));
             // Invoked when shutdown.
             Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                 private volatile boolean hasShutdown = false;
@@ -133,7 +159,7 @@ public class ConnectStartup {
         Option opt = new Option("c", "configFile", true, "connect config 
properties file");
         opt.setRequired(false);
         options.addOption(opt);
-
         return options;
+
     }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
similarity index 68%
rename from 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
rename to 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index c31b7b8..747e806 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -17,26 +17,36 @@
 
 package org.apache.rocketmq.connect.runtime;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import 
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import 
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
+import 
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
+import 
org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl;
+import 
org.apache.rocketmq.connect.runtime.service.memory.FileOffsetManagementServiceImpl;
+import 
org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Startup class of the runtime worker.
  */
-public class ConnectStartup {
+public class StandaloneConnectStartup {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
@@ -50,11 +60,11 @@ public class ConnectStartup {
         start(createConnectController(args));
     }
 
-    private static void start(ConnectController controller) {
+    private static void start(StandaloneConnectController controller) {
 
         try {
             controller.start();
-            String tip = "The worker [" + 
controller.getClusterManagementService().getCurrentWorker() + "] boot success.";
+            String tip = "The standalone worker boot success.";
             log.info(tip);
             System.out.printf("%s%n", tip);
         } catch (Throwable e) {
@@ -69,37 +79,51 @@ public class ConnectStartup {
      * @param args
      * @return
      */
-    private static ConnectController createConnectController(String[] args) {
-
+    private static StandaloneConnectController 
createConnectController(String[] args) {
         try {
-
             // Build the command line options.
             Options options = ServerUtil.buildCommandlineOptions(new 
Options());
             commandLine = ServerUtil.parseCmdLine("connect", args, 
buildCommandlineOptions(options),
-                new PosixParser());
+                    new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
             }
 
             // Load configs from command line.
-            ConnectConfig connectConfig = new ConnectConfig();
+            StandaloneConfig connectConfig = new StandaloneConfig();
             if (commandLine.hasOption('c')) {
-                String file = commandLine.getOptionValue('c');
+                String file = commandLine.getOptionValue('c').trim();
                 if (file != null) {
                     configFile = file;
                     InputStream in = new BufferedInputStream(new 
FileInputStream(file));
                     properties = new Properties();
                     properties.load(in);
-
                     FileAndPropertyUtil.properties2Object(properties, 
connectConfig);
-
                     in.close();
                 }
             }
 
+            List<String> pluginPaths = new ArrayList<>(16);
+            if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
+                String[] strArr = connectConfig.getPluginPaths().split(",");
+                for (String path : strArr) {
+                    if (StringUtils.isNotEmpty(path)) {
+                        pluginPaths.add(path);
+                    }
+                }
+            }
+            Plugin plugin = new Plugin(pluginPaths);
+            plugin.initPlugin();
+
             // Create controller and initialize.
-            ConnectController controller = new 
ConnectController(connectConfig);
-            controller.initialize();
+
+            StandaloneConnectController controller = new 
StandaloneConnectController(
+                    plugin,
+                    connectConfig,
+                    new MemoryClusterManagementServiceImpl(connectConfig),
+                    new MemoryConfigManagementServiceImpl(connectConfig, 
plugin),
+                    new FilePositionManagementServiceImpl(connectConfig),
+                    new FileOffsetManagementServiceImpl(connectConfig));
             // Invoked when shutdown.
             Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                 private volatile boolean hasShutdown = false;
@@ -133,7 +157,7 @@ public class ConnectStartup {
         Option opt = new Option("c", "configFile", true, "connect config 
properties file");
         opt.setRequired(false);
         options.addOption(opt);
-
         return options;
+
     }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 9d48143..48bb04d 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -131,8 +131,9 @@ public class Worker {
     private final ConnectStatsService connectStatsService;
 
     public Worker(ConnectConfig connectConfig,
-        PositionManagementService positionManagementService, 
ConfigManagementService configManagementService,
-        Plugin plugin, ConnectController connectController) {
+                  PositionManagementService positionManagementService,
+                  ConfigManagementService configManagementService,
+                  Plugin plugin, AbstractConnectController connectController) {
         this.connectConfig = connectConfig;
         this.taskExecutor = Executors.newCachedThreadPool(new 
DefaultThreadFactory("task-Worker-Executor-"));
         this.positionManagementService = positionManagementService;
@@ -159,7 +160,7 @@ public class Worker {
      * @throws Exception
      */
     public synchronized void startConnectors(Map<String, ConnectKeyValue> 
connectorConfigs,
-        ConnectController connectController) throws Exception {
+        AbstractConnectController connectController) throws Exception {
         Set<WorkerConnector> stoppedConnector = new HashSet<>();
         for (WorkerConnector workerConnector : workingConnectors) {
             try {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index d6f3ed7..2a0e6c7 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +79,8 @@ public class WorkerDirectTask implements WorkerTask {
 
     private final OffsetStorageReader positionStorageReader;
 
+    private final PositionStorageWriter positionStorageWriter;
+
     private final AtomicReference<WorkerState> workerState;
 
     public WorkerDirectTask(String connectorName,
@@ -91,7 +94,8 @@ public class WorkerDirectTask implements WorkerTask {
         this.sinkTask = sinkTask;
         this.taskConfig = taskConfig;
         this.positionManagementService = positionManagementService;
-        this.positionStorageReader = new 
PositionStorageReaderImpl(positionManagementService);
+        this.positionStorageReader = new 
PositionStorageReaderImpl(connectorName, positionManagementService);
+        this.positionStorageWriter = new PositionStorageWriter(connectorName, 
positionManagementService);
         this.state = new AtomicReference<>(WorkerTaskState.NEW);
         this.workerState = workerState;
     }
@@ -141,7 +145,7 @@ public class WorkerDirectTask implements WorkerTask {
             sinkTask.put(sinkDataEntries);
             try {
                 if (!MapUtils.isEmpty(map)) {
-                    map.forEach(positionManagementService::putPosition);
+                    map.forEach(positionStorageWriter::putPosition);
                 }
             } catch (Exception e) {
                 log.error("Source task save position info failed.", e);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 4c7eee8..8e62ca3 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -54,6 +54,7 @@ import 
org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,13 +88,18 @@ public class WorkerSourceTask implements WorkerTask {
      */
     private AtomicReference<WorkerTaskState> state;
 
-    private final PositionManagementService positionManagementService;
+
 
     /**
      * Used to read the position of source data source.
      */
     private OffsetStorageReader offsetStorageReader;
 
+    /**
+     * Used to write the position of source data source.
+     */
+    private PositionStorageWriter positionStorageWriter;
+
     /**
      * A RocketMQ producer to send message to dest MQ.
      */
@@ -137,8 +143,8 @@ public class WorkerSourceTask implements WorkerTask {
         this.connectorName = connectorName;
         this.sourceTask = sourceTask;
         this.taskConfig = taskConfig;
-        this.positionManagementService = positionManagementService;
-        this.offsetStorageReader = new 
PositionStorageReaderImpl(positionManagementService);
+        this.offsetStorageReader = new 
PositionStorageReaderImpl(connectorName, positionManagementService);
+        this.positionStorageWriter = new PositionStorageWriter(connectorName, 
positionManagementService);
         this.producer = producer;
         this.recordConverter = recordConverter;
         this.state = new AtomicReference<>(WorkerTaskState.NEW);
@@ -322,7 +328,7 @@ public class WorkerSourceTask implements WorkerTask {
                             if (null != partition && null != position) {
                                 Map<String, String> offsetMap = (Map<String, 
String>) offset.getOffset();
                                 
offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
String.valueOf(sourceDataEntry.getTimestamp()));
-                                
positionManagementService.putPosition(partition, offset);
+                                positionStorageWriter.putPosition(partition, 
offset);
                             }
 
                         } catch (Exception e) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
new file mode 100644
index 0000000..73c259d
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.rest.RestHandler;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * connect controller
+ */
+public abstract class AbstractConnectController implements ConnectController {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    /**
+     * Configuration of current runtime.
+     */
+    protected  final ConnectConfig connectConfig;
+
+    /**
+     * All the configurations of current running connectors and tasks in 
cluster.
+     */
+    protected final ConfigManagementService configManagementService;
+
+    /**
+     * Position management of source tasks.
+     */
+    protected final PositionManagementService positionManagementService;
+
+    /**
+     * Offset management of sink tasks.
+     */
+    protected final PositionManagementService offsetManagementService;
+
+    /**
+     * Manage the online info of the cluster.
+     */
+    protected final ClusterManagementService clusterManagementService;
+
+    /**
+     * A worker to schedule all connectors and tasks assigned to current 
process.
+     */
+    protected final Worker worker;
+
+    /**
+     * A REST handler, interacting with user.
+     */
+    protected final RestHandler restHandler;
+
+
+
+    protected final Plugin plugin;
+
+    protected final ConnectStatsManager connectStatsManager;
+
+    protected final ConnectStatsService connectStatsService;
+
+    /**
+     * init connect controller
+     * @param connectConfig
+     */
+    public AbstractConnectController(
+            Plugin plugin,
+            ConnectConfig connectConfig,
+            ClusterManagementService clusterManagementService,
+            ConfigManagementService configManagementService,
+            PositionManagementService positionManagementService,
+            PositionManagementService offsetManagementService
+    ) {
+        // set config
+        this.connectConfig = connectConfig;
+        // set plugin
+        this.plugin = plugin;
+        // set metrics
+        this.connectStatsManager = new ConnectStatsManager(connectConfig);
+        this.connectStatsService = new ConnectStatsService();
+
+        this.clusterManagementService = clusterManagementService;
+        this.configManagementService = configManagementService;
+        this.positionManagementService = positionManagementService;
+        this.offsetManagementService = offsetManagementService;
+        this.worker = new Worker(connectConfig, positionManagementService, 
configManagementService, plugin, this);
+        this.restHandler = new RestHandler(this);
+    }
+
+
+    @Override
+    public void start() {
+        clusterManagementService.start();
+        configManagementService.start();
+        positionManagementService.start();
+        offsetManagementService.start();
+        worker.start();
+        connectStatsService.start();
+    }
+
+    @Override
+    public void shutdown() {
+
+        if (worker != null) {
+            worker.stop();
+        }
+
+        if (configManagementService != null) {
+            configManagementService.stop();
+        }
+
+        if (positionManagementService != null) {
+            positionManagementService.stop();
+        }
+
+        if (offsetManagementService != null) {
+            offsetManagementService.stop();
+        }
+
+        if (clusterManagementService != null) {
+            clusterManagementService.stop();
+        }
+
+    }
+
+    public ConnectConfig getConnectConfig() {
+        return connectConfig;
+    }
+
+    public ConfigManagementService getConfigManagementService() {
+        return configManagementService;
+    }
+
+    public PositionManagementService getPositionManagementService() {
+        return positionManagementService;
+    }
+
+    public ClusterManagementService getClusterManagementService() {
+        return clusterManagementService;
+    }
+
+    public Worker getWorker() {
+        return worker;
+    }
+
+    public ConnectStatsManager getConnectStatsManager() {
+        return connectStatsManager;
+    }
+
+    public ConnectStatsService getConnectStatsService() {
+        return connectStatsService;
+    }
+
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
new file mode 100644
index 0000000..1b2049d
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller;
+
+/**
+ * connect controller
+ */
+public interface ConnectController {
+
+    /**
+     * start controller
+     */
+    void start();
+
+    /**
+     * shutdown controller
+     */
+    void shutdown();
+
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
new file mode 100644
index 0000000..4784b21
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+/**
+ * distributed worker config
+ */
+public class DistributedConfig extends ConnectConfig {
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
new file mode 100644
index 0000000..0dd048a
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import org.apache.rocketmq.connect.runtime.service.RebalanceService;
+import 
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Connect controller to access and control all resource in runtime.
+ */
+public class DistributedConnectController extends AbstractConnectController {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    private final RebalanceImpl rebalanceImpl;
+
+    /**
+     * Thread pool to run schedule task.
+     */
+    protected ScheduledExecutorService scheduledExecutorService;
+
+    /**
+     * A scheduled task to rebalance all connectors and tasks in the cluster.
+     */
+    private final RebalanceService rebalanceService;
+
+    public DistributedConnectController(Plugin plugin,
+                                         DistributedConfig connectConfig,
+                                         ClusterManagementService 
clusterManagementService,
+                                         ConfigManagementService 
configManagementService,
+                                         PositionManagementService 
positionManagementService,
+                                         PositionManagementService 
offsetManagementService) {
+
+        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService, offsetManagementService);
+        AllocateConnAndTaskStrategy strategy = 
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
+        this.rebalanceImpl = new RebalanceImpl(worker, 
configManagementService, clusterManagementService, strategy, this);
+        this.rebalanceService = new RebalanceService(rebalanceImpl, 
configManagementService, clusterManagementService);
+    }
+
+    @Override
+    public void start() {
+        this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, 
"ConnectScheduledThread"));
+        super.start();
+        rebalanceService.start();
+        // Persist configurations of current connectors and tasks.
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                this.configManagementService.persist();
+            } catch (Exception e) {
+                log.error("schedule persist config error.", e);
+            }
+        }, 1000, this.connectConfig.getConfigPersistInterval(), 
TimeUnit.MILLISECONDS);
+
+        // Persist position information of source tasks.
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+
+            try {
+                this.positionManagementService.persist();
+            } catch (Exception e) {
+                log.error("schedule persist position error.", e);
+            }
+
+        }, 1000, this.connectConfig.getPositionPersistInterval(), 
TimeUnit.MILLISECONDS);
+
+        // Persist offset information of sink tasks.
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            try {
+                this.offsetManagementService.persist();
+            } catch (Exception e) {
+                log.error("schedule persist offset error.", e);
+            }
+        }, 1000, this.connectConfig.getOffsetPersistInterval(), 
TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        if (rebalanceService != null) {
+            rebalanceService.stop();
+        }
+        scheduledExecutorService.shutdown();
+        try {
+            scheduledExecutorService.awaitTermination(5000, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("shutdown scheduledExecutorService error.", e);
+        }
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
new file mode 100644
index 0000000..967c72e
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+/**
+ *  worker standalone config
+ *
+ */
+public class StandaloneConfig extends ConnectConfig  {
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
new file mode 100644
index 0000000..45b6061
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import 
org.apache.rocketmq.connect.runtime.service.memory.StandaloneRebalanceService;
+import 
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+
+
+/**
+ * Connect controller to access and control all resource in runtime.
+ */
+public class StandaloneConnectController extends AbstractConnectController {
+
+    private final RebalanceImpl rebalanceImpl;
+    private final StandaloneRebalanceService rebalanceService;
+    public StandaloneConnectController(Plugin plugin,
+                                       StandaloneConfig connectConfig,
+                                       ClusterManagementService 
clusterManagementService,
+                                       ConfigManagementService 
configManagementService,
+                                       PositionManagementService 
positionManagementService,
+                                       PositionManagementService 
offsetManagementService) {
+
+        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService, offsetManagementService);
+        AllocateConnAndTaskStrategy strategy = 
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
+        this.rebalanceImpl = new RebalanceImpl(worker, 
configManagementService, clusterManagementService, strategy, this);
+        this.rebalanceService = new StandaloneRebalanceService(rebalanceImpl, 
configManagementService, clusterManagementService);
+
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        rebalanceService.start();
+    }
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        if (rebalanceService != null) {
+            rebalanceService.stop();
+        }
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
index 625d14a..0ed4960 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
@@ -19,21 +19,21 @@ package org.apache.rocketmq.connect.runtime.converter;
 
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.Converter;
-import io.openmessaging.connector.api.data.RecordPartition;
 import java.io.UnsupportedEncodingException;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * ByteBuffer converter.
  */
-public class RecordPartitionConverter implements Converter<RecordPartition> {
+public class RecordPartitionConverter implements 
Converter<ExtendRecordPartition> {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     @Override
-    public byte[] objectToByte(RecordPartition recordPartition) {
+    public byte[] objectToByte(ExtendRecordPartition recordPartition) {
         try {
             String json = JSON.toJSONString(recordPartition);
             return json.getBytes("UTF-8");
@@ -44,10 +44,10 @@ public class RecordPartitionConverter implements 
Converter<RecordPartition> {
     }
 
     @Override
-    public RecordPartition byteToObject(byte[] bytes) {
+    public ExtendRecordPartition byteToObject(byte[] bytes) {
         try {
             String text = new String(bytes, "UTF-8");
-            RecordPartition res = JSON.parseObject(text, 
RecordPartition.class);
+            ExtendRecordPartition res = JSON.parseObject(text, 
ExtendRecordPartition.class);
             return res;
         } catch (UnsupportedEncodingException e) {
             log.error("JsonConverter#byteToObject failed", e);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
index f0a6fc8..f0a3bde 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
@@ -20,28 +20,28 @@ package org.apache.rocketmq.connect.runtime.converter;
 import com.alibaba.fastjson.JSON;
 import io.openmessaging.connector.api.data.Converter;
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Byte Map to byte[].
  */
-public class RecordPositionMapConverter implements 
Converter<Map<RecordPartition, RecordOffset>> {
+public class RecordPositionMapConverter implements 
Converter<Map<ExtendRecordPartition, RecordOffset>> {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     @Override
-    public byte[] objectToByte(Map<RecordPartition, RecordOffset> map) {
+    public byte[] objectToByte(Map<ExtendRecordPartition, RecordOffset> map) {
 
         try {
             Map<String, String> resultMap = new HashMap<>();
 
-            for (Map.Entry<RecordPartition, RecordOffset> entry : 
map.entrySet()) {
+            for (Map.Entry<ExtendRecordPartition, RecordOffset> entry : 
map.entrySet()) {
                 String jsonKey = JSON.toJSONString(entry.getKey());
                 jsonKey.getBytes("UTF-8");
                 String jsonValue = JSON.toJSONString(entry.getValue());
@@ -56,14 +56,14 @@ public class RecordPositionMapConverter implements 
Converter<Map<RecordPartition
     }
 
     @Override
-    public Map<RecordPartition, RecordOffset> byteToObject(byte[] bytes) {
+    public Map<ExtendRecordPartition, RecordOffset> byteToObject(byte[] bytes) 
{
 
-        Map<RecordPartition, RecordOffset> resultMap = new HashMap<>();
+        Map<ExtendRecordPartition, RecordOffset> resultMap = new HashMap<>();
         try {
             String rawString = new String(bytes, "UTF-8");
             Map<String, String> map = JSON.parseObject(rawString, Map.class);
             for (String key : map.keySet()) {
-                RecordPartition recordPartition = JSON.parseObject(key, 
RecordPartition.class);
+                ExtendRecordPartition recordPartition = JSON.parseObject(key, 
ExtendRecordPartition.class);
                 RecordOffset recordOffset = JSON.parseObject(map.get(key), 
RecordOffset.class);
                 resultMap.put(recordPartition, recordOffset);
             }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b2a3a5b..58e4e7e 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -25,7 +25,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
@@ -40,13 +41,13 @@ public class RestHandler {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    private final ConnectController connectController;
+    private final AbstractConnectController connectController;
 
     private static final String CONNECTOR_CONFIGS = "connectorConfigs";
 
     private static final String TASK_CONFIGS = "taskConfigs";
 
-    public RestHandler(ConnectController connectController) {
+    public RestHandler(AbstractConnectController connectController) {
         this.connectController = connectController;
         Javalin app = Javalin.create();
         app.enableCaseSensitiveUrls();
@@ -74,9 +75,7 @@ public class RestHandler {
 
 
     private void getAllocatedConnectors(Context context) {
-
         Set<WorkerConnector> workerConnectors = 
connectController.getWorker().getWorkingConnectors();
-        Set<Runnable> workerTasks = 
connectController.getWorker().getWorkingTasks();
         Map<String, ConnectKeyValue> connectors = new HashMap<>();
         for (WorkerConnector workerConnector : workerConnectors) {
             connectors.put(workerConnector.getConnectorName(), 
workerConnector.getKeyValue());
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 0b0a1fe..9a07d56 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.connect.runtime.service;
 
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
 import java.util.List;
 
 /**
@@ -36,6 +38,15 @@ public interface ClusterManagementService {
      */
     void stop();
 
+    /**
+     * Configure class with the given key-value pairs
+     *
+     * @param config can be DistributedConfig or StandaloneConfig
+     */
+    default void configure(ConnectConfig config) {
+
+    }
+
     /**
      * Check if Cluster Store Topic exists.
      *
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 59c697b..5d6d6c9 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -18,9 +18,12 @@
 package org.apache.rocketmq.connect.runtime.service;
 
 import io.openmessaging.connector.api.component.connector.Connector;
+
 import java.util.List;
 import java.util.Map;
+
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 
 /**
@@ -29,6 +32,7 @@ import org.apache.rocketmq.connect.runtime.utils.Plugin;
  */
 public interface ConfigManagementService {
 
+
     /**
      * Start the config manager.
      */
@@ -39,6 +43,15 @@ public interface ConfigManagementService {
      */
     void stop();
 
+    /**
+     * Configure class with the given key-value pairs
+     *
+     * @param config can be DistributedConfig or StandaloneConfig
+     */
+    default void configure(ConnectConfig config) {
+
+    }
+
     /**
      * Get all connector configs from the cluster filter out DELETE set to 1
      *
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
index 8dabdfe..d6254cd 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
@@ -20,7 +20,8 @@ package org.apache.rocketmq.connect.runtime.service;
 import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.connector.ConnectorContext;
 import java.util.Set;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -32,11 +33,11 @@ public class DefaultConnectorContext implements 
ConnectorContext {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
-    private final ConnectController controller;
+    private final AbstractConnectController controller;
 
     private final String connectorName;
 
-    public DefaultConnectorContext(String connectorName, ConnectController 
connectController) {
+    public DefaultConnectorContext(String connectorName, 
AbstractConnectController connectController) {
         this.controller = connectController;
         this.connectorName = connectorName;
     }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 792b24b..49860be 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.connect.runtime.service;
 
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -32,6 +33,7 @@ import 
org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
 import 
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -48,18 +50,18 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
     /**
      * Current offset info in store.
      */
-    private KeyValueStore<RecordPartition, RecordOffset> offsetStore;
+    private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
 
 
     /**
      * The updated partition of the task in the current instance.
      */
-    private Set<RecordPartition> needSyncPartition;
+    private Set<ExtendRecordPartition> needSyncPartition;
 
     /**
      * Synchronize data with other workers.
      */
-    private DataSynchronizer<String, Map<RecordPartition, RecordOffset>> 
dataSynchronizer;
+    private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>> 
dataSynchronizer;
 
     private final String offsetManagePrefix = "OffsetManage";
 
@@ -71,14 +73,14 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
     public OffsetManagementServiceImpl(ConnectConfig connectConfig) {
 
         this.offsetStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
-            new RecordPartitionConverter(),
-            new RecordOffsetConverter());
+                new RecordPartitionConverter(),
+                new RecordOffsetConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
-            connectConfig.getOffsetStoreTopic(),
-            ConnectUtil.createGroupName(offsetManagePrefix, 
connectConfig.getWorkerId()),
-            new OffsetChangeCallback(),
-            new JsonConverter(),
-            new RecordPositionMapConverter());
+                connectConfig.getOffsetStoreTopic(),
+                ConnectUtil.createGroupName(offsetManagePrefix, 
connectConfig.getWorkerId()),
+                new OffsetChangeCallback(),
+                new JsonConverter(),
+                new RecordPositionMapConverter());
         this.offsetUpdateListener = new HashSet<>();
         this.needSyncPartition = new ConcurrentSet<>();
         this.prepare(connectConfig);
@@ -120,7 +122,8 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
         offsetStore.persist();
     }
 
-    @Override public void load() {
+    @Override
+    public void load() {
         offsetStore.load();
     }
 
@@ -131,38 +134,38 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
     }
 
     @Override
-    public Map<RecordPartition, RecordOffset> getPositionTable() {
+    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
 
         return offsetStore.getKVMap();
     }
 
     @Override
-    public RecordOffset getPosition(RecordPartition partition) {
+    public RecordOffset getPosition(ExtendRecordPartition partition) {
 
         return offsetStore.get(partition);
     }
 
     @Override
-    public void putPosition(Map<RecordPartition, RecordOffset> offsets) {
+    public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
 
         offsetStore.putAll(offsets);
         needSyncPartition.addAll(offsets.keySet());
     }
 
     @Override
-    public void putPosition(RecordPartition partition, RecordOffset position) {
+    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
 
         offsetStore.put(partition, position);
         needSyncPartition.add(partition);
     }
 
     @Override
-    public void removePosition(List<RecordPartition> offsets) {
+    public void removePosition(List<ExtendRecordPartition> offsets) {
 
         if (null == offsets) {
             return;
         }
-        for (RecordPartition offset : offsets) {
+        for (ExtendRecordPartition offset : offsets) {
             needSyncPartition.remove(offset);
             offsetStore.remove(offset);
         }
@@ -182,11 +185,11 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
 
     private void sendNeedSynchronizeOffset() {
 
-        Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+        Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
         needSyncPartition = new ConcurrentSet<>();
-        Map<RecordPartition, RecordOffset> needSyncOffset = 
offsetStore.getKVMap().entrySet().stream()
-            .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
-            .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
+        Map<ExtendRecordPartition, RecordOffset> needSyncOffset = 
offsetStore.getKVMap().entrySet().stream()
+                .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
+                .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
 
         dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(), 
needSyncOffset);
     }
@@ -196,10 +199,10 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
         dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(), 
offsetStore.getKVMap());
     }
 
-    private class OffsetChangeCallback implements 
DataSynchronizerCallback<String, Map<RecordPartition, RecordOffset>> {
+    private class OffsetChangeCallback implements 
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
 
         @Override
-        public void onCompletion(Throwable error, String key, 
Map<RecordPartition, RecordOffset> result) {
+        public void onCompletion(Throwable error, String key, 
Map<ExtendRecordPartition, RecordOffset> result) {
 
             boolean changed = false;
             switch (OffsetChangeEnum.valueOf(key)) {
@@ -232,16 +235,16 @@ public class OffsetManagementServiceImpl implements 
PositionManagementService {
      * @param result
      * @return
      */
-    private boolean mergeOffsetInfo(Map<RecordPartition, RecordOffset> result) 
{
+    private boolean mergeOffsetInfo(Map<ExtendRecordPartition, RecordOffset> 
result) {
 
         boolean changed = false;
         if (null == result || 0 == result.size()) {
             return changed;
         }
 
-        for (Map.Entry<RecordPartition, RecordOffset> newEntry : 
result.entrySet()) {
+        for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry : 
result.entrySet()) {
             boolean find = false;
-            for (Map.Entry<RecordPartition, RecordOffset> existedEntry : 
offsetStore.getKVMap().entrySet()) {
+            for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry : 
offsetStore.getKVMap().entrySet()) {
                 if (newEntry.getKey().equals(existedEntry.getKey())) {
                     find = true;
                     if (!newEntry.getValue().equals(existedEntry.getValue())) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
index 8f01605..bd3faa7 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
@@ -19,7 +19,9 @@
 package org.apache.rocketmq.connect.runtime.service;
 
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+
 import java.util.List;
 import java.util.Map;
 
@@ -38,6 +40,15 @@ public interface PositionManagementService {
      */
     void stop();
 
+    /**
+     * Configure class with the given key-value pairs
+     *
+     * @param config can be DistributedConfig or StandaloneConfig
+     */
+    default void configure(ConnectConfig config) {
+
+    }
+
     /**
      * Persist position info in a persist store.
      */
@@ -50,7 +61,7 @@ public interface PositionManagementService {
 
     /**
      * Synchronize to other nodes.
-     * */
+     */
     void synchronize();
 
     /**
@@ -58,23 +69,23 @@ public interface PositionManagementService {
      *
      * @return
      */
-    Map<RecordPartition, RecordOffset> getPositionTable();
+    Map<ExtendRecordPartition, RecordOffset> getPositionTable();
 
-    RecordOffset getPosition(RecordPartition partition);
+    RecordOffset getPosition(ExtendRecordPartition partition);
 
     /**
      * Put a position info.
      */
-    void putPosition(Map<RecordPartition, RecordOffset> positions);
+    void putPosition(Map<ExtendRecordPartition, RecordOffset> positions);
 
-    void putPosition(RecordPartition partition, RecordOffset position);
+    void putPosition(ExtendRecordPartition partition, RecordOffset position);
 
     /**
      * Remove a position info.
      *
      * @param partitions
      */
-    void removePosition(List<RecordPartition> partitions);
+    void removePosition(List<ExtendRecordPartition> partitions);
 
     /**
      * Register a listener.
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 2c2c217..893809e 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.connect.runtime.service;
 
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -32,6 +33,7 @@ import 
org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
 import 
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -48,17 +50,17 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
     /**
      * Current position info in store.
      */
-    private KeyValueStore<RecordPartition, RecordOffset> positionStore;
+    private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
 
     /**
      * The updated partition of the task in the current instance.
-     * */
-    private Set<RecordPartition> needSyncPartition;
+     */
+    private Set<ExtendRecordPartition> needSyncPartition;
 
     /**
      * Synchronize data with other workers.
      */
-    private DataSynchronizer<String, Map<RecordPartition, RecordOffset>> 
dataSynchronizer;
+    private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>> 
dataSynchronizer;
 
     /**
      * Listeners.
@@ -70,14 +72,14 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
     public PositionManagementServiceImpl(ConnectConfig connectConfig) {
 
         this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
-            new RecordPartitionConverter(),
-            new RecordOffsetConverter());
+                new RecordPartitionConverter(),
+                new RecordOffsetConverter());
         this.dataSynchronizer = new BrokerBasedLog(connectConfig,
-            connectConfig.getPositionStoreTopic(),
-            ConnectUtil.createGroupName(positionManagePrefix, 
connectConfig.getWorkerId()),
-            new PositionChangeCallback(),
-            new JsonConverter(),
-            new RecordPositionMapConverter());
+                connectConfig.getPositionStoreTopic(),
+                ConnectUtil.createGroupName(positionManagePrefix, 
connectConfig.getWorkerId()),
+                new PositionChangeCallback(),
+                new JsonConverter(),
+                new RecordPositionMapConverter());
         this.positionUpdateListener = new HashSet<>();
         this.needSyncPartition = new ConcurrentSet<>();
         this.prepare(connectConfig);
@@ -119,7 +121,8 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         positionStore.persist();
     }
 
-    @Override public void load() {
+    @Override
+    public void load() {
         positionStore.load();
     }
 
@@ -130,39 +133,39 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
     }
 
     @Override
-    public Map<RecordPartition, RecordOffset> getPositionTable() {
+    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
 
         return positionStore.getKVMap();
     }
 
     @Override
-    public RecordOffset getPosition(RecordPartition partition) {
+    public RecordOffset getPosition(ExtendRecordPartition partition) {
 
         return positionStore.get(partition);
     }
 
     @Override
-    public void putPosition(Map<RecordPartition, RecordOffset> positions) {
+    public void putPosition(Map<ExtendRecordPartition, RecordOffset> 
positions) {
 
         positionStore.putAll(positions);
         needSyncPartition.addAll(positions.keySet());
     }
 
     @Override
-    public void putPosition(RecordPartition partition, RecordOffset position) {
+    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
 
         positionStore.put(partition, position);
         needSyncPartition.add(partition);
     }
 
     @Override
-    public void removePosition(List<RecordPartition> partitions) {
+    public void removePosition(List<ExtendRecordPartition> partitions) {
 
         if (null == partitions) {
             return;
         }
 
-        for (RecordPartition partition : partitions) {
+        for (ExtendRecordPartition partition : partitions) {
             needSyncPartition.remove(partition);
             positionStore.remove(partition);
         }
@@ -182,11 +185,11 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
 
     private void sendNeedSynchronizePosition() {
 
-        Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+        Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
         needSyncPartition = new ConcurrentSet<>();
-        Map<RecordPartition, RecordOffset> needSyncPosition = 
positionStore.getKVMap().entrySet().stream()
-            .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
-            .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
+        Map<ExtendRecordPartition, RecordOffset> needSyncPosition = 
positionStore.getKVMap().entrySet().stream()
+                .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
+                .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
 
         dataSynchronizer.send(PositionChangeEnum.POSITION_CHANG_KEY.name(), 
needSyncPosition);
     }
@@ -196,10 +199,10 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         dataSynchronizer.send(PositionChangeEnum.POSITION_CHANG_KEY.name(), 
positionStore.getKVMap());
     }
 
-    private class PositionChangeCallback implements 
DataSynchronizerCallback<String, Map<RecordPartition, RecordOffset>> {
+    private class PositionChangeCallback implements 
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
 
         @Override
-        public void onCompletion(Throwable error, String key, 
Map<RecordPartition, RecordOffset> result) {
+        public void onCompletion(Throwable error, String key, 
Map<ExtendRecordPartition, RecordOffset> result) {
 
             boolean changed = false;
             switch (PositionChangeEnum.valueOf(key)) {
@@ -232,16 +235,16 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
      * @param result
      * @return
      */
-    private boolean mergePositionInfo(Map<RecordPartition, RecordOffset> 
result) {
+    private boolean mergePositionInfo(Map<ExtendRecordPartition, RecordOffset> 
result) {
 
         boolean changed = false;
         if (null == result || 0 == result.size()) {
             return changed;
         }
 
-        for (Map.Entry<RecordPartition, RecordOffset> newEntry : 
result.entrySet()) {
+        for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry : 
result.entrySet()) {
             boolean find = false;
-            for (Map.Entry<RecordPartition, RecordOffset> existedEntry : 
positionStore.getKVMap().entrySet()) {
+            for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry : 
positionStore.getKVMap().entrySet()) {
                 if (newEntry.getKey().equals(existedEntry.getKey())) {
                     find = true;
                     if (!newEntry.getValue().equals(existedEntry.getValue())) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 8b1ed6d..7d8cbad 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -19,13 +19,15 @@ package org.apache.rocketmq.connect.runtime.service;
 
 import java.util.List;
 import java.util.Map;
+
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import 
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import 
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
 import 
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.slf4j.Logger;
@@ -58,10 +60,10 @@ public class RebalanceImpl {
      */
     private AllocateConnAndTaskStrategy allocateConnAndTaskStrategy;
 
-    private final ConnectController connectController;
+    private final AbstractConnectController connectController;
 
-    public RebalanceImpl(Worker worker, ConfigManagementService 
configManagementService,
-        ClusterManagementService clusterManagementService, 
AllocateConnAndTaskStrategy strategy, ConnectController connectController) {
+    public RebalanceImpl(Worker worker,
+                         ConfigManagementService configManagementService, 
ClusterManagementService clusterManagementService, AllocateConnAndTaskStrategy 
strategy, AbstractConnectController connectController) {
 
         this.worker = worker;
         this.configManagementService = configManagementService;
@@ -86,8 +88,14 @@ public class RebalanceImpl {
     public void doRebalance() {
         List<String> curAliveWorkers = 
clusterManagementService.getAllAliveWorkers();
         if (curAliveWorkers != null) {
-            log.info("Current Alive workers : " + curAliveWorkers.size());
+            if (clusterManagementService instanceof 
ClusterManagementServiceImpl) {
+                log.info("Current Alive workers : " + curAliveWorkers.size());
+            } else if (clusterManagementService instanceof 
MemoryClusterManagementServiceImpl) {
+                log.info("Current alive worker : " + 
curAliveWorkers.iterator().next());
+            }
         }
+
+
         Map<String, ConnectKeyValue> curConnectorConfigs = 
configManagementService.getConnectorConfigs();
         log.info("Current ConnectorConfigs : " + curConnectorConfigs);
         Map<String, List<ConnectKeyValue>> curTaskConfigs = 
configManagementService.getTaskConfigs();
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
new file mode 100644
index 0000000..f445d58
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
+import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import 
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * memory offset management service impl
+ */
+public class FileOffsetManagementServiceImpl implements 
PositionManagementService {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    protected ExecutorService executor;
+
+    /**
+     * Current offset info in store.
+     */
+    private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
+
+
+    /**
+     * Listeners.
+     */
+    private PositionUpdateListener offsetUpdateListener;
+
+    public FileOffsetManagementServiceImpl(ConnectConfig connectConfig) {
+        this.offsetStore = new FileBaseKeyValueStore<>(
+                
FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
+                new RecordPartitionConverter(),
+                new RecordOffsetConverter());
+    }
+
+    @Override
+    public void start() {
+        executor = Executors.newFixedThreadPool(1, 
ThreadUtils.newThreadFactory(
+                this.getClass().getSimpleName() + "-%d", false));
+        offsetStore.load();
+    }
+
+    @Override
+    public void stop() {
+        offsetStore.persist();
+        if (executor != null) {
+            executor.shutdown();
+            // Best effort wait for any get() and set() tasks (and caller's 
callbacks) to complete.
+            try {
+                executor.awaitTermination(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            if (!executor.shutdownNow().isEmpty()) {
+                throw new ConnectException("Failed to stop 
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
+                        "shutting down pending tasks and/or callbacks.");
+            }
+            executor = null;
+        }
+    }
+
+    @Override
+    public void persist() {
+        offsetStore.persist();
+    }
+
+    @Override
+    public void load() {
+        offsetStore.load();
+    }
+
+    @Override
+    public void synchronize() {
+    }
+
+    @Override
+    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
+        return offsetStore.getKVMap();
+    }
+
+    @Override
+    public RecordOffset getPosition(ExtendRecordPartition partition) {
+        return offsetStore.get(partition);
+    }
+
+    @Override
+    public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
+        offsetStore.putAll(offsets);
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist offsets to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist offsets to storage: {}", 
offsets);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
+        offsetStore.put(partition, position);
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist offsets to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist offsets to storage: {}, 
{}", partition, position);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void removePosition(List<ExtendRecordPartition> offsets) {
+        if (null == offsets) {
+            return;
+        }
+        for (ExtendRecordPartition offset : offsets) {
+            offsetStore.remove(offset);
+        }
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist offsets to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist offsets to storage: {}", 
offsets);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void registerListener(PositionUpdateListener listener) {
+        this.offsetUpdateListener = listener;
+    }
+
+
+    private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void> 
callback) {
+        if (offsetUpdateListener != null) {
+            offsetUpdateListener.onPositionUpdate();
+        }
+
+        return executor.submit(new Callable<Void>() {
+            /**
+             * Computes a result, or throws an exception if unable to do so.
+             *
+             * @return computed result
+             * @throws Exception if unable to compute a result
+             */
+            @Override
+            public Void call() {
+                try {
+                    offsetStore.persist();
+                    if (callback != null) {
+                        callback.onCompletion(null, null, null);
+                    }
+                } catch (Exception error) {
+                    callback.onCompletion(error, null, null);
+                }
+                return null;
+            }
+        });
+    }
+
+}
+
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
new file mode 100644
index 0000000..c0ebb2c
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
+import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import 
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * standalone
+ */
+public class FilePositionManagementServiceImpl implements 
PositionManagementService {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    protected ExecutorService executor;
+    /**
+     * Current position info in store.
+     */
+    private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
+    /**
+     * Listeners.
+     */
+    private PositionUpdateListener positionUpdateListener;
+
+
+    public FilePositionManagementServiceImpl(ConnectConfig connectConfig) {
+        this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
+                new RecordPartitionConverter(),
+                new RecordOffsetConverter());
+    }
+
+
+    @Override
+    public void start() {
+        executor = Executors.newFixedThreadPool(1, 
ThreadUtils.newThreadFactory(
+                this.getClass().getSimpleName() + "-%d", false));
+        positionStore.load();
+    }
+
+    @Override
+    public void stop() {
+        positionStore.persist();
+        if (executor != null) {
+            executor.shutdown();
+            // Best effort wait for any get() and set() tasks (and caller's 
callbacks) to complete.
+            try {
+                executor.awaitTermination(30, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            if (!executor.shutdownNow().isEmpty()) {
+                throw new ConnectException("Failed to stop 
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
+                        "shutting down pending tasks and/or callbacks.");
+            }
+            executor = null;
+        }
+    }
+
+    @Override
+    public void persist() {
+        positionStore.persist();
+    }
+
+    @Override
+    public void load() {
+        positionStore.load();
+    }
+
+    @Override
+    public void synchronize() {
+    }
+
+    @Override
+    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
+        return positionStore.getKVMap();
+    }
+
+    @Override
+    public RecordOffset getPosition(ExtendRecordPartition partition) {
+        return positionStore.get(partition);
+    }
+
+    @Override
+    public void putPosition(Map<ExtendRecordPartition, RecordOffset> 
positions) {
+        positionStore.putAll(positions);
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist positions to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist positions to storage: {} 
", positions);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
+        positionStore.put(partition, position);
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist positions to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist positions to storage: {} , 
{} ", partition, position);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void removePosition(List<ExtendRecordPartition> partitions) {
+        if (null == partitions) {
+            return;
+        }
+        for (ExtendRecordPartition partition : partitions) {
+            positionStore.remove(partition);
+        }
+        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void key, Void result) {
+                if (error != null) {
+                    log.error("Failed to persist positions to storage: {}", 
error);
+                } else {
+                    log.trace("Successed to persist positions to storage: {}", 
partitions);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void registerListener(PositionUpdateListener listener) {
+        this.positionUpdateListener = listener;
+    }
+
+    private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void> 
callback) {
+        if (this.positionUpdateListener != null) {
+            positionUpdateListener.onPositionUpdate();
+        }
+
+        return executor.submit(new Callable<Void>() {
+            /**
+             * Computes a result, or throws an exception if unable to do so.
+             *
+             * @return computed result
+             * @throws Exception if unable to compute a result
+             */
+            @Override
+            public Void call() {
+                try {
+                    positionStore.persist();
+                    if (callback != null) {
+                        callback.onCompletion(null, null, null);
+                    }
+                } catch (Exception error) {
+                    callback.onCompletion(error, null, null);
+                }
+                return null;
+            }
+        });
+    }
+
+
+}
+
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
new file mode 100644
index 0000000..5e14d70
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import 
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * standalone cluster management service
+ */
+public class MemoryClusterManagementServiceImpl implements 
ClusterManagementService {
+
+    private StandaloneConfig config;
+
+    public MemoryClusterManagementServiceImpl(ConnectConfig config) {
+        this.configure(config);
+    }
+
+    /**
+     * Configure class with the given key-value pairs
+     *
+     * @param config can be DistributedConfig or StandaloneConfig
+     */
+    @Override
+    public void configure(ConnectConfig config) {
+        this.config = (StandaloneConfig) config;
+    }
+
+    /**
+     * Start the cluster manager.
+     */
+    @Override
+    public void start() {
+
+    }
+
+    /**
+     * Stop the cluster manager.
+     */
+    @Override
+    public void stop() {
+
+    }
+
+    /**
+     * Check if Cluster Store Topic exists.
+     *
+     * @return true if Cluster Store Topic exists, otherwise return false.
+     */
+    @Override
+    public boolean hasClusterStoreTopic() {
+        return false;
+    }
+
+    /**
+     * Get all alive workers in the cluster.
+     *
+     * @return
+     */
+    @Override
+    public List<String> getAllAliveWorkers() {
+        return Collections.singletonList(this.config.getWorkerId());
+    }
+
+    /**
+     * Register a worker status listener to listen the change of alive workers.
+     *
+     * @param listener
+     */
+    @Override
+    public void registerListener(WorkerStatusListener listener) {
+    }
+
+    @Override
+    public String getCurrentWorker() {
+        return this.config.getWorkerId();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
new file mode 100644
index 0000000..f0736c4
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
+import org.apache.rocketmq.connect.runtime.converter.ListConverter;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * memory config management service impl for standalone
+ */
+public class MemoryConfigManagementServiceImpl implements 
ConfigManagementService {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    /**
+     * Current connector configs in the store.
+     */
+    private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+
+    /**
+     * Current task configs in the store.
+     */
+    private KeyValueStore<String, List<ConnectKeyValue>> taskKeyValueStore;
+
+    /**
+     * All listeners to trigger while config change.
+     */
+    private ConnectorConfigUpdateListener connectorConfigUpdateListener;
+
+    private final Plugin plugin;
+
+    public MemoryConfigManagementServiceImpl(ConnectConfig connectConfig, 
Plugin plugin) {
+
+        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
+                
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+                new JsonConverter(),
+                new JsonConverter(ConnectKeyValue.class));
+        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
+                
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+                new JsonConverter(),
+                new ListConverter(ConnectKeyValue.class));
+        this.plugin = plugin;
+    }
+
+    @Override
+    public void start() {
+        connectorKeyValueStore.load();
+        taskKeyValueStore.load();
+    }
+
+    @Override
+    public void stop() {
+        connectorKeyValueStore.persist();
+        taskKeyValueStore.persist();
+    }
+
+    /**
+     * get all connector configs enabled
+     *
+     * @return
+     */
+    @Override
+    public Map<String, ConnectKeyValue> getConnectorConfigs() {
+        Map<String, ConnectKeyValue> result = new HashMap<>();
+        Map<String, ConnectKeyValue> connectorConfigs = 
connectorKeyValueStore.getKVMap();
+        for (String connectorName : connectorConfigs.keySet()) {
+            ConnectKeyValue config = connectorConfigs.get(connectorName);
+            if (0 != config.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+                continue;
+            }
+            result.put(connectorName, config);
+        }
+        return result;
+    }
+
+    /**
+     * get all connector configs include deleted
+     *
+     * @return
+     */
+    @Override
+    public Map<String, ConnectKeyValue> getConnectorConfigsIncludeDeleted() {
+        Map<String, ConnectKeyValue> result = new HashMap<>();
+        Map<String, ConnectKeyValue> connectorConfigs = 
connectorKeyValueStore.getKVMap();
+        for (String connectorName : connectorConfigs.keySet()) {
+            ConnectKeyValue config = connectorConfigs.get(connectorName);
+            result.put(connectorName, config);
+        }
+        return result;
+    }
+
+    @Override
+    public String putConnectorConfig(String connectorName, ConnectKeyValue 
configs) throws Exception {
+        ConnectKeyValue exist = connectorKeyValueStore.get(connectorName);
+        if (null != exist) {
+            Long updateTimestamp = 
exist.getLong(RuntimeConfigDefine.UPDATE_TIMESTAMP);
+            if (null != updateTimestamp) {
+                configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
updateTimestamp);
+            }
+        }
+        if (configs.equals(exist)) {
+            return "Connector with same config already exist.";
+        }
+
+        Long currentTimestamp = System.currentTimeMillis();
+        configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+        for (String requireConfig : RuntimeConfigDefine.REQUEST_CONFIG) {
+            if (!configs.containsKey(requireConfig)) {
+                return "Request config key: " + requireConfig;
+            }
+        }
+
+        String connectorClass = 
configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
+        Class clazz;
+        if (null != classLoader) {
+            clazz = Class.forName(connectorClass, true, classLoader);
+        } else {
+            clazz = Class.forName(connectorClass);
+        }
+        final Connector connector = (Connector) 
clazz.getDeclaredConstructor().newInstance();
+        connector.validate(configs);
+        connector.init(configs);
+        connectorKeyValueStore.put(connectorName, configs);
+        recomputeTaskConfigs(connectorName, connector, currentTimestamp, 
configs);
+        return "";
+    }
+
+    @Override
+    public void recomputeTaskConfigs(String connectorName, Connector 
connector, Long currentTimestamp, ConnectKeyValue configs) {
+        int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+        ConnectKeyValue connectConfig = 
connectorKeyValueStore.get(connectorName);
+        boolean directEnable = 
Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
+        List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
+        List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
+        for (KeyValue keyValue : taskConfigs) {
+            ConnectKeyValue newKeyValue = new ConnectKeyValue();
+            for (String key : keyValue.keySet()) {
+                newKeyValue.put(key, keyValue.getString(key));
+            }
+            if (directEnable) {
+                newKeyValue.put(RuntimeConfigDefine.TASK_TYPE, 
Worker.TaskType.DIRECT.name());
+                newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS, 
connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
+                newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS, 
connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
+            }
+            newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, 
connector.taskClass().getName());
+            newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
currentTimestamp);
+
+            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, 
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, 
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+            Set<String> connectConfigKeySet = configs.keySet();
+            for (String connectConfigKey : connectConfigKeySet) {
+                if 
(connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+                    newKeyValue.put(connectConfigKey, 
configs.getString(connectConfigKey));
+                }
+            }
+            converterdConfigs.add(newKeyValue);
+        }
+        putTaskConfigs(connectorName, converterdConfigs);
+        triggerListener();
+    }
+
+    @Override
+    public void removeConnectorConfig(String connectorName) {
+        ConnectKeyValue config = connectorKeyValueStore.get(connectorName);
+        config.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
System.currentTimeMillis());
+        config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
+        List<ConnectKeyValue> taskConfigList = 
taskKeyValueStore.get(connectorName);
+        taskConfigList.add(config);
+        connectorKeyValueStore.put(connectorName, config);
+        putTaskConfigs(connectorName, taskConfigList);
+        log.info("[ISSUE #2027] After removal The configs are:\n" + 
getConnectorConfigs().toString());
+        triggerListener();
+    }
+
+    @Override
+    public Map<String, List<ConnectKeyValue>> getTaskConfigs() {
+        Map<String, List<ConnectKeyValue>> result = new HashMap<>();
+        Map<String, List<ConnectKeyValue>> taskConfigs = 
taskKeyValueStore.getKVMap();
+        Map<String, ConnectKeyValue> filteredConnector = getConnectorConfigs();
+        for (String connectorName : taskConfigs.keySet()) {
+            if (!filteredConnector.containsKey(connectorName)) {
+                continue;
+            }
+            result.put(connectorName, taskConfigs.get(connectorName));
+        }
+        return result;
+    }
+
+    private void putTaskConfigs(String connectorName, List<ConnectKeyValue> 
configs) {
+        List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
+        if (null != exist && exist.size() > 0) {
+            taskKeyValueStore.remove(connectorName);
+        }
+        taskKeyValueStore.put(connectorName, configs);
+    }
+
+    @Override
+    public void persist() {
+        this.connectorKeyValueStore.persist();
+        this.taskKeyValueStore.persist();
+    }
+
+    @Override
+    public void registerListener(ConnectorConfigUpdateListener listener) {
+        this.connectorConfigUpdateListener = listener;
+    }
+
+    private void triggerListener() {
+        if (null == this.connectorConfigUpdateListener) {
+            return;
+        }
+        connectorConfigUpdateListener.onConfigUpdate();
+    }
+
+    @Override
+    public Plugin getPlugin() {
+        return this.plugin;
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
new file mode 100644
index 0000000..dbe39e2
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * standalone rebalance service
+ */
+public class StandaloneRebalanceService extends ServiceThread {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+    private static long waitInterval = 
Long.parseLong(System.getProperty("rocketmq.runtime.cluster.rebalance.waitInterval",
 "20000"));
+
+    /**
+     * Assign all connectors and tasks to all alive process in the cluster.
+     */
+    private final RebalanceImpl rebalanceImpl;
+
+    /**
+     * ConfigManagementService to access current config info.
+     */
+    private final ConfigManagementService configManagementService;
+
+    /**
+     * ClusterManagementService to access current cluster info.
+     */
+    private final ClusterManagementService clusterManagementService;
+
+    public StandaloneRebalanceService(RebalanceImpl rebalanceImpl, 
ConfigManagementService configManagementService,
+                                      ClusterManagementService 
clusterManagementService) {
+        this.rebalanceImpl = rebalanceImpl;
+        this.configManagementService = configManagementService;
+        this.clusterManagementService = clusterManagementService;
+        this.configManagementService.registerListener(new 
ConnectorConnectorConfigChangeListenerImpl());
+    }
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+        while (!this.isStopped()) {
+            try {
+                this.waitForRunning(waitInterval);
+                this.rebalanceImpl.doRebalance();
+            } catch (Exception e) {
+                log.error("reblacne exception", e);
+            }
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+    @Override
+    public String getServiceName() {
+        return StandaloneRebalanceService.class.getSimpleName();
+    }
+
+    class ConnectorConnectorConfigChangeListenerImpl implements 
ConfigManagementService.ConnectorConfigUpdateListener {
+
+        /**
+         * When config change.
+         */
+        @Override
+        public void onConfigUpdate() {
+            StandaloneRebalanceService.this.wakeup();
+        }
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
index 029a35b..a61982d 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DefaultAllocateConnAndTaskStrategy implements 
AllocateConnAndTaskStrategy {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-    
+
     @Override
     public ConnAndTaskConfigs allocate(List<String> allWorker, String 
curWorker,
         Map<String, ConnectKeyValue> connectorConfigs,
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
new file mode 100644
index 0000000..6190a9d
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.store;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * extend record partition
+ */
+public class ExtendRecordPartition extends RecordPartition {
+    /**
+     * connect name
+     */
+    private String namespace;
+
+    public ExtendRecordPartition(String namespace, Map<String, ?> partition) {
+        super(partition);
+        this.namespace = namespace;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof ExtendRecordPartition)) return false;
+        if (!super.equals(o)) return false;
+        ExtendRecordPartition that = (ExtendRecordPartition) o;
+        return this.namespace.equals(that.namespace);
+    }
+
+    @Override
+    public String toString() {
+        return "ExtendRecordPartition{" +
+                "namespace='" + namespace + '\'' +
+                "} " + super.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), namespace);
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
index 84c5365..09980df 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
@@ -29,22 +29,25 @@ import 
org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 public class PositionStorageReaderImpl implements OffsetStorageReader {
 
     private PositionManagementService positionManagementService;
+    private final String namespace;
 
-    public PositionStorageReaderImpl(PositionManagementService 
positionManagementService) {
-
+    public PositionStorageReaderImpl(String namespace, 
PositionManagementService positionManagementService) {
+        this.namespace = namespace;
         this.positionManagementService = positionManagementService;
     }
 
     @Override public RecordOffset readOffset(RecordPartition partition) {
-        return positionManagementService.getPositionTable().get(partition);
+        ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition(namespace, partition.getPartition());
+        return 
positionManagementService.getPositionTable().get(extendRecordPartition);
     }
 
     @Override public  Map<RecordPartition, RecordOffset> 
readOffsets(Collection<RecordPartition> partitions) {
         Map<RecordPartition, RecordOffset> result = new HashMap<>();
-        Map<RecordPartition, RecordOffset> allData = 
positionManagementService.getPositionTable();
+        Map<ExtendRecordPartition, RecordOffset> allData = 
positionManagementService.getPositionTable();
         for (RecordPartition key : partitions) {
-            if (allData.containsKey(key)) {
-                result.put(key, allData.get(key));
+            ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition(namespace, key.getPartition());
+            if (allData.containsKey(extendRecordPartition)) {
+                result.put(key, allData.get(extendRecordPartition));
             }
         }
         return result;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
similarity index 56%
copy from 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
copy to 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
index 84c5365..c5fa1fa 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
@@ -18,35 +18,26 @@
 
 package org.apache.rocketmq.connect.runtime.store;
 
+
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.storage.OffsetStorageReader;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 
-public class PositionStorageReaderImpl implements OffsetStorageReader {
+/**
+ * position storage writer
+ */
+public class PositionStorageWriter {
 
     private PositionManagementService positionManagementService;
+    private final String namespace;
 
-    public PositionStorageReaderImpl(PositionManagementService 
positionManagementService) {
-
+    public PositionStorageWriter(String namespace, PositionManagementService 
positionManagementService) {
+        this.namespace = namespace;
         this.positionManagementService = positionManagementService;
     }
 
-    @Override public RecordOffset readOffset(RecordPartition partition) {
-        return positionManagementService.getPositionTable().get(partition);
-    }
-
-    @Override public  Map<RecordPartition, RecordOffset> 
readOffsets(Collection<RecordPartition> partitions) {
-        Map<RecordPartition, RecordOffset> result = new HashMap<>();
-        Map<RecordPartition, RecordOffset> allData = 
positionManagementService.getPositionTable();
-        for (RecordPartition key : partitions) {
-            if (allData.containsKey(key)) {
-                result.put(key, allData.get(key));
-            }
-        }
-        return result;
+    public void putPosition(RecordPartition partition, RecordOffset position) {
+        ExtendRecordPartition extendRecordPartition = new 
ExtendRecordPartition(namespace, partition.getPartition());
+        positionManagementService.putPosition(extendRecordPartition, position);
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf 
b/rocketmq-connect-runtime/src/main/resources/connect-distributed.conf
similarity index 100%
copy from rocketmq-connect-runtime/src/main/resources/connect.conf
copy to rocketmq-connect-runtime/src/main/resources/connect-distributed.conf
diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf 
b/rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
similarity index 95%
rename from rocketmq-connect-runtime/src/main/resources/connect.conf
rename to rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
index 449728d..1b476b2 100644
--- a/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
@@ -13,8 +13,8 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-workerId=DEFAULT_WORKER_1
-storePathRootDir=/xxx/storeRoot
+workerId=standalone-worker
+storePathRootDir=/tmp/storeRoot
 
 ## Http port for user to access REST API
 httpPort=8082
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index a9c51ed..3a04098 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -30,7 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -79,7 +79,7 @@ public class WorkerTest {
     private ConnectorContext connectorContext;
 
     @Mock
-    private ConnectController connectController;
+    private DistributedConnectController connectController;
 
     @Mock
     private ConnectStatsManager connectStatsManager;
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
index 9cf5c09..10884df 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
@@ -20,6 +20,7 @@ package 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 
 import java.util.List;
 import java.util.Map;
@@ -50,23 +51,23 @@ public class TestPositionManageServiceImpl implements 
PositionManagementService
 
     }
 
-    @Override public Map<RecordPartition, RecordOffset> getPositionTable() {
+    @Override public Map<ExtendRecordPartition, RecordOffset> 
getPositionTable() {
         return null;
     }
 
-    @Override public RecordOffset getPosition(RecordPartition partition) {
+    @Override public RecordOffset getPosition(ExtendRecordPartition partition) 
{
         return null;
     }
 
-    @Override public void putPosition(Map<RecordPartition, RecordOffset> 
positions) {
+    @Override public void putPosition(Map<ExtendRecordPartition, RecordOffset> 
positions) {
 
     }
 
-    @Override public void putPosition(RecordPartition partition, RecordOffset 
position) {
+    @Override public void putPosition(ExtendRecordPartition partition, 
RecordOffset position) {
 
     }
 
-    @Override public void removePosition(List<RecordPartition> partitions) {
+    @Override public void removePosition(List<ExtendRecordPartition> 
partitions) {
 
     }
 
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index c3f5e56..b452e6c 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -40,7 +40,7 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -71,7 +71,7 @@ import static org.mockito.Mockito.when;
 public class RestHandlerTest {
 
     @Mock
-    private ConnectController connectController;
+    private DistributedConnectController connectController;
 
     @Mock
     private ConfigManagementService configManagementService;
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index cbf9366..6aa55b8 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.runtime.service;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.Future;
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.producer.SendResult;
 import java.io.File;
 import java.lang.reflect.Field;
@@ -37,6 +36,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
@@ -74,16 +74,17 @@ public class PositionManagementServiceImplTest {
 
     private PositionManagementServiceImpl positionManagementService;
 
-    private Set<RecordPartition> needSyncPartition;
+    private Set<ExtendRecordPartition> needSyncPartition;
 
-    private KeyValueStore<RecordPartition, RecordOffset> positionStore;
+    private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
 
-    private RecordPartition sourcePartition;
+    private ExtendRecordPartition sourcePartition;
 
     private RecordOffset sourcePosition;
 
-    private Map<RecordPartition, RecordOffset> positions;
+    private Map<ExtendRecordPartition, RecordOffset> positions;
 
+    private final String namespace = "namespace";
     @Before
     public void init() throws Exception {
         connectConfig = new ConnectConfig();
@@ -132,18 +133,18 @@ public class PositionManagementServiceImplTest {
 
         Field positionStoreField = 
PositionManagementServiceImpl.class.getDeclaredField("positionStore");
         positionStoreField.setAccessible(true);
-        positionStore = (KeyValueStore<RecordPartition, RecordOffset>) 
positionStoreField.get(positionManagementService);
+        positionStore = (KeyValueStore<ExtendRecordPartition, RecordOffset>) 
positionStoreField.get(positionManagementService);
 
 
         Field needSyncPartitionField = 
PositionManagementServiceImpl.class.getDeclaredField("needSyncPartition");
         needSyncPartitionField.setAccessible(true);
-        needSyncPartition = (ConcurrentSet<RecordPartition>) 
needSyncPartitionField.get(positionManagementService);
+        needSyncPartition = (ConcurrentSet<ExtendRecordPartition>) 
needSyncPartitionField.get(positionManagementService);
         Map<String, String> map = Maps.newHashMap("ip_port", "127.0.0.13306");
-        sourcePartition = new RecordPartition(map);
+        sourcePartition = new ExtendRecordPartition(namespace,map);
         Map<String, String> map1 = Maps.newHashMap("binlog_file", 
"binlogFilename");
         map1.put("next_position", "100");
         sourcePosition = new RecordOffset(map1);
-        positions = new HashMap<RecordPartition, RecordOffset>() {
+        positions = new HashMap<ExtendRecordPartition, RecordOffset>() {
             {
                 put(sourcePartition, sourcePosition);
             }
@@ -158,7 +159,7 @@ public class PositionManagementServiceImplTest {
 
     @Test
     public void testGetPositionTable() {
-        Map<RecordPartition, RecordOffset> positionTable = 
positionManagementService.getPositionTable();
+        Map<ExtendRecordPartition, RecordOffset> positionTable = 
positionManagementService.getPositionTable();
         RecordOffset bytes = positionTable.get(sourcePartition);
 
         assertNull(bytes);
@@ -190,7 +191,7 @@ public class PositionManagementServiceImplTest {
 
         assertNotNull(bytes);
 
-        List<RecordPartition> sourcePartitions = new 
ArrayList<RecordPartition>(8) {
+        List<ExtendRecordPartition> sourcePartitions = new 
ArrayList<ExtendRecordPartition>(8) {
             {
                 add(sourcePartition);
             }
@@ -209,7 +210,7 @@ public class PositionManagementServiceImplTest {
 
         assertTrue(needSyncPartition.contains(sourcePartition));
 
-        List<RecordPartition> sourcePartitions = new 
ArrayList<RecordPartition>(8) {
+        List<ExtendRecordPartition> sourcePartitions = new 
ArrayList<ExtendRecordPartition>(8) {
             {
                 add(sourcePartition);
             }
@@ -232,15 +233,15 @@ public class PositionManagementServiceImplTest {
         positionManagementService.putPosition(positions);
 
         Map<String, String> map = Maps.newHashMap("ip_port", "127.0.0.2:3306");
-        RecordPartition sourcePartitionTmp = new RecordPartition(map);
+        ExtendRecordPartition sourcePartitionTmp = new 
ExtendRecordPartition(namespace,map);
         Map<String, String> map1 = Maps.newHashMap("binlog_file", 
"binlogFilename");
         map1.put("next_position", "100");
         RecordOffset sourcePositionTmp = new RecordOffset(map1);
         positionStore.put(sourcePartitionTmp, sourcePositionTmp);
 
-        Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+        Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
         needSyncPartition = new ConcurrentSet<>();
-        Map<RecordPartition, RecordOffset> needSyncPosition = 
positionStore.getKVMap().entrySet().stream()
+        Map<ExtendRecordPartition, RecordOffset> needSyncPosition = 
positionStore.getKVMap().entrySet().stream()
             .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
             .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
 
@@ -252,7 +253,7 @@ public class PositionManagementServiceImplTest {
         RecordOffset tmpBytes = needSyncPosition.get(sourcePartitionTmp);
         assertNull(tmpBytes);
 
-        List<RecordPartition> sourcePartitions = new 
ArrayList<RecordPartition>(8) {
+        List<ExtendRecordPartition> sourcePartitions = new 
ArrayList<ExtendRecordPartition>(8) {
             {
                 add(sourcePartition);
                 add(sourcePartitionTmp);

Reply via email to