This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4b4fc  [ISSUES #91] runtime service support spi (#92)
ed4b4fc is described below

commit ed4b4fcb56bd6937a07adc68c53bb72f5a67c373
Author: zhoubo <[email protected]>
AuthorDate: Fri Jul 1 14:49:18 2022 +0800

    [ISSUES #91] runtime service support spi (#92)
    
    * cluster service support spi
    
    * remove offsetManagementService
    
    * config position service support spi
    
    * connect service support spi
---
 .../connect/runtime/ConnectController.java         |   0
 .../connect/runtime/DistributedConnectStartup.java |  22 +-
 .../connect/runtime/StandaloneConnectStartup.java  |  30 ++-
 .../runtime/connectorwrapper/WorkerDirectTask.java |  12 +-
 .../controller/AbstractConnectController.java      |  14 +-
 .../distributed/DistributedConnectController.java  |  14 +-
 .../standalone/StandaloneConnectController.java    |   5 +-
 .../runtime/service/ClusterManagementService.java  |   7 +-
 .../service/ClusterManagementServiceImpl.java      |  21 +-
 .../runtime/service/ConfigManagementService.java   |   4 +
 .../service/ConfigManagementServiceImpl.java       |  28 ++-
 .../service/OffsetManagementServiceImpl.java       | 277 ---------------------
 .../runtime/service/PositionManagementService.java |   9 +-
 .../service/PositionManagementServiceImpl.java     |  34 +--
 .../connect/runtime/service/StagingMode.java       |  24 ++
 .../memory/FileOffsetManagementServiceImpl.java    | 204 ---------------
 .../memory/FilePositionManagementServiceImpl.java  |  18 +-
 .../memory/MemoryClusterManagementServiceImpl.java |  13 +-
 .../memory/MemoryConfigManagementServiceImpl.java  |  23 +-
 .../connect/runtime/store/KeyValueStore.java       |  11 +
 .../runtime/store/MemoryBasedKeyValueStore.java    |   4 +
 .../connect/runtime/utils/ServiceProviderUtil.java |  96 +++++++
 ...onnect.runtime.service.ClusterManagementService |   2 +
 ...connect.runtime.service.ConfigManagementService |   2 +
 ...nnect.runtime.service.PositionManagementService |   2 +
 .../testimpl/TestPositionManageServiceImpl.java    |  16 +-
 .../service/ConfigManagementServiceImplTest.java   |   4 +-
 .../service/PositionManagementServiceImplTest.java |   3 +-
 28 files changed, 317 insertions(+), 582 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
new file mode 100644
index 0000000..e69de29
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index f3310a8..2f0dcda 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -35,13 +35,14 @@ import 
org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConfig;
 import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
-import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
-import 
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
+import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,14 +132,19 @@ public class DistributedConnectStartup {
             plugin.initPlugin();
 
             // Create controller and initialize.
+            ClusterManagementService clusterManagementService = 
ServiceProviderUtil.getClusterManagementServices(StagingMode.DISTRIBUTED);
+            clusterManagementService.initialize(connectConfig);
+            ConfigManagementService configManagementService = 
ServiceProviderUtil.getConfigManagementServices(StagingMode.DISTRIBUTED);
+            configManagementService.initialize(connectConfig, plugin);
+            PositionManagementService positionManagementServices = 
ServiceProviderUtil.getPositionManagementServices(StagingMode.DISTRIBUTED);
+            positionManagementServices.initialize(connectConfig);
 
             DistributedConnectController controller = new 
DistributedConnectController(
                     plugin,
                     connectConfig,
-                    new ClusterManagementServiceImpl(connectConfig),
-                    new ConfigManagementServiceImpl(connectConfig, plugin),
-                    new PositionManagementServiceImpl(connectConfig),
-                    new OffsetManagementServiceImpl(connectConfig));
+                    clusterManagementService,
+                    configManagementService,
+                    positionManagementServices);
             // Invoked when shutdown.
             Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                 private volatile boolean hasShutdown = false;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index e803864..2770265 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -35,13 +35,14 @@ import 
org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import 
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
 import 
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
-import 
org.apache.rocketmq.connect.runtime.service.memory.FileOffsetManagementServiceImpl;
-import 
org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl;
-import 
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
-import 
org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
+import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,7 +87,7 @@ public class StandaloneConnectStartup {
             // Build the command line options.
             Options options = ServerUtil.buildCommandlineOptions(new 
Options());
             commandLine = ServerUtil.parseCmdLine("connect", args, 
buildCommandlineOptions(options),
-                    new PosixParser());
+                new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
             }
@@ -116,7 +117,6 @@ public class StandaloneConnectStartup {
             lc.reset();
             configurator.doConfigure(connectConfig.getConnectHome() + 
"/conf/logback.xml");
 
-
             List<String> pluginPaths = new ArrayList<>(16);
             if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
                 String[] strArr = connectConfig.getPluginPaths().split(",");
@@ -129,15 +129,19 @@ public class StandaloneConnectStartup {
             Plugin plugin = new Plugin(pluginPaths);
             plugin.initPlugin();
 
-            // Create controller and initialize.
+            ClusterManagementService clusterManagementService = 
ServiceProviderUtil.getClusterManagementServices(StagingMode.STANDALONE);
+            clusterManagementService.initialize(connectConfig);
+            ConfigManagementService configManagementService = 
ServiceProviderUtil.getConfigManagementServices(StagingMode.STANDALONE);
+            configManagementService.initialize(connectConfig, plugin);
+            PositionManagementService positionManagementServices = 
ServiceProviderUtil.getPositionManagementServices(StagingMode.STANDALONE);
+            positionManagementServices.initialize(connectConfig);
 
             StandaloneConnectController controller = new 
StandaloneConnectController(
-                    plugin,
-                    connectConfig,
-                    new MemoryClusterManagementServiceImpl(connectConfig),
-                    new MemoryConfigManagementServiceImpl(connectConfig, 
plugin),
-                    new FilePositionManagementServiceImpl(connectConfig),
-                    new FileOffsetManagementServiceImpl(connectConfig));
+                plugin,
+                connectConfig,
+                clusterManagementService,
+                configManagementService,
+                positionManagementServices);
             // Invoked when shutdown.
             Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                 private volatile boolean hasShutdown = false;
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 992c071..de3d25b 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -179,6 +179,10 @@ public class WorkerDirectTask implements WorkerTask {
                 return taskConfig;
             }
 
+            @Override public KeyValue configs() {
+                return taskConfig;
+            }
+
             @Override
             public void resetOffset(RecordPartition recordPartition, 
RecordOffset recordOffset) {
 
@@ -219,11 +223,15 @@ public class WorkerDirectTask implements WorkerTask {
             }
 
             @Override public String getConnectorName() {
-                return null;
+                return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
             }
 
             @Override public String getTaskName() {
-                return null;
+                return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
+            }
+
+            @Override public KeyValue configs() {
+                return taskConfig;
             }
             /**
              * Get the configurations of current task.
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 73c259d..84128ae 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -53,11 +53,6 @@ public abstract class AbstractConnectController implements 
ConnectController {
      */
     protected final PositionManagementService positionManagementService;
 
-    /**
-     * Offset management of sink tasks.
-     */
-    protected final PositionManagementService offsetManagementService;
-
     /**
      * Manage the online info of the cluster.
      */
@@ -90,8 +85,7 @@ public abstract class AbstractConnectController implements 
ConnectController {
             ConnectConfig connectConfig,
             ClusterManagementService clusterManagementService,
             ConfigManagementService configManagementService,
-            PositionManagementService positionManagementService,
-            PositionManagementService offsetManagementService
+            PositionManagementService positionManagementService
     ) {
         // set config
         this.connectConfig = connectConfig;
@@ -104,7 +98,6 @@ public abstract class AbstractConnectController implements 
ConnectController {
         this.clusterManagementService = clusterManagementService;
         this.configManagementService = configManagementService;
         this.positionManagementService = positionManagementService;
-        this.offsetManagementService = offsetManagementService;
         this.worker = new Worker(connectConfig, positionManagementService, 
configManagementService, plugin, this);
         this.restHandler = new RestHandler(this);
     }
@@ -115,7 +108,6 @@ public abstract class AbstractConnectController implements 
ConnectController {
         clusterManagementService.start();
         configManagementService.start();
         positionManagementService.start();
-        offsetManagementService.start();
         worker.start();
         connectStatsService.start();
     }
@@ -135,10 +127,6 @@ public abstract class AbstractConnectController implements 
ConnectController {
             positionManagementService.stop();
         }
 
-        if (offsetManagementService != null) {
-            offsetManagementService.stop();
-        }
-
         if (clusterManagementService != null) {
             clusterManagementService.stop();
         }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
index 0dd048a..403cf30 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -57,10 +57,9 @@ public class DistributedConnectController extends 
AbstractConnectController {
                                          DistributedConfig connectConfig,
                                          ClusterManagementService 
clusterManagementService,
                                          ConfigManagementService 
configManagementService,
-                                         PositionManagementService 
positionManagementService,
-                                         PositionManagementService 
offsetManagementService) {
+                                         PositionManagementService 
positionManagementService) {
 
-        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService, offsetManagementService);
+        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService);
         AllocateConnAndTaskStrategy strategy = 
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
         this.rebalanceImpl = new RebalanceImpl(worker, 
configManagementService, clusterManagementService, strategy, this);
         this.rebalanceService = new RebalanceService(rebalanceImpl, 
configManagementService, clusterManagementService);
@@ -91,15 +90,6 @@ public class DistributedConnectController extends 
AbstractConnectController {
 
         }, 1000, this.connectConfig.getPositionPersistInterval(), 
TimeUnit.MILLISECONDS);
 
-        // Persist offset information of sink tasks.
-        scheduledExecutorService.scheduleAtFixedRate(() -> {
-            try {
-                this.offsetManagementService.persist();
-            } catch (Exception e) {
-                log.error("schedule persist offset error.", e);
-            }
-        }, 1000, this.connectConfig.getOffsetPersistInterval(), 
TimeUnit.MILLISECONDS);
-
     }
 
     @Override
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
index 45b6061..feb2bb9 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -39,10 +39,9 @@ public class StandaloneConnectController extends 
AbstractConnectController {
                                        StandaloneConfig connectConfig,
                                        ClusterManagementService 
clusterManagementService,
                                        ConfigManagementService 
configManagementService,
-                                       PositionManagementService 
positionManagementService,
-                                       PositionManagementService 
offsetManagementService) {
+                                       PositionManagementService 
positionManagementService) {
 
-        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService, offsetManagementService);
+        super(plugin, connectConfig, clusterManagementService, 
configManagementService, positionManagementService);
         AllocateConnAndTaskStrategy strategy = 
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
         this.rebalanceImpl = new RebalanceImpl(worker, 
configManagementService, clusterManagementService, strategy, this);
         this.rebalanceService = new StandaloneRebalanceService(rebalanceImpl, 
configManagementService, clusterManagementService);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 9a07d56..8792593 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -17,9 +17,8 @@
 
 package org.apache.rocketmq.connect.runtime.service;
 
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-
 import java.util.List;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 
 /**
  * Interface for cluster management.
@@ -28,6 +27,8 @@ public interface ClusterManagementService {
 
     Long WORKER_TIME_OUT = 30 * 1000L;
 
+    void initialize(ConnectConfig connectConfig);
+
     /**
      * Start the cluster manager.
      */
@@ -70,6 +71,8 @@ public interface ClusterManagementService {
 
     String getCurrentWorker();
 
+    StagingMode getStagingMode();
+
     interface WorkerStatusListener {
 
         /**
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index bebc338..2d97737 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -45,19 +45,14 @@ public class ClusterManagementServiceImpl implements 
ClusterManagementService {
     /**
      * Configs of current worker.
      */
-    private final ConnectConfig connectConfig;
+    private ConnectConfig connectConfig;
 
     /**
      * Used for worker discovery
      */
     private DefaultMQPullConsumer defaultMQPullConsumer;
 
-    public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
-        this.connectConfig = connectConfig;
-        this.workerStatusListeners = new HashSet<>();
-        this.defaultMQPullConsumer = 
ConnectUtil.initDefaultMQPullConsumer(connectConfig);
-        
this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
-        this.prepare(connectConfig);
+    public ClusterManagementServiceImpl() {
     }
 
     /**
@@ -81,6 +76,14 @@ public class ClusterManagementServiceImpl implements 
ClusterManagementService {
 
     }
 
+    @Override public void initialize(ConnectConfig connectConfig) {
+        this.connectConfig = connectConfig;
+        this.workerStatusListeners = new HashSet<>();
+        this.defaultMQPullConsumer = 
ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+        
this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
+        this.prepare(connectConfig);
+    }
+
     @Override
     public void start() {
         try {
@@ -126,6 +129,10 @@ public class ClusterManagementServiceImpl implements 
ClusterManagementService {
         return 
this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getClientId();
     }
 
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.DISTRIBUTED;
+    }
+
     @Override
     public void registerListener(WorkerStatusListener listener) {
         this.workerStatusListeners.add(listener);
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 5d6d6c9..60343e8 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -104,6 +104,8 @@ public interface ConfigManagementService {
      */
     void registerListener(ConnectorConfigUpdateListener listener);
 
+    void initialize(ConnectConfig connectConfig, Plugin plugin);
+
     interface ConnectorConfigUpdateListener {
 
         /**
@@ -113,4 +115,6 @@ public interface ConfigManagementService {
     }
 
     Plugin getPlugin();
+
+    StagingMode getStagingMode();
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 1134e1d..0ee1655 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -69,10 +69,13 @@ public class ConfigManagementServiceImpl implements 
ConfigManagementService {
      */
     private DataSynchronizer<String, ConnAndTaskConfigs> dataSynchronizer;
 
-    private final Plugin plugin;
+    private Plugin plugin;
 
     private final String configManagePrefix = "ConfigManage";
 
+    public ConfigManagementServiceImpl() {
+    }
+
     public ConfigManagementServiceImpl(ConnectConfig connectConfig, Plugin 
plugin) {
 
         this.connectorConfigUpdateListener = new HashSet<>();
@@ -279,6 +282,25 @@ public class ConfigManagementServiceImpl implements 
ConfigManagementService {
         this.connectorConfigUpdateListener.add(listener);
     }
 
+    @Override public void initialize(ConnectConfig connectConfig, Plugin 
plugin) {
+        this.connectorConfigUpdateListener = new HashSet<>();
+        this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
+            connectConfig.getConfigStoreTopic(),
+            ConnectUtil.createGroupName(configManagePrefix, 
connectConfig.getWorkerId()),
+            new ConfigChangeCallback(),
+            new JsonConverter(),
+            new ConnAndTaskConfigConverter());
+        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
+            
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+            new JsonConverter(),
+            new JsonConverter(ConnectKeyValue.class));
+        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
+            
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+            new JsonConverter(),
+            new ListConverter(ConnectKeyValue.class));
+        this.plugin = plugin;
+    }
+
     private void triggerListener() {
 
         if (null == this.connectorConfigUpdateListener) {
@@ -375,4 +397,8 @@ public class ConfigManagementServiceImpl implements 
ConfigManagementService {
     public Plugin getPlugin() {
         return this.plugin;
     }
+
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.DISTRIBUTED;
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
deleted file mode 100644
index 49860be..0000000
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.service;
-
-import io.netty.util.internal.ConcurrentSet;
-import io.openmessaging.connector.api.data.RecordOffset;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
-import 
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
-import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
-import 
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OffsetManagementServiceImpl implements PositionManagementService {
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-    /**
-     * Current offset info in store.
-     */
-    private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
-
-
-    /**
-     * The updated partition of the task in the current instance.
-     */
-    private Set<ExtendRecordPartition> needSyncPartition;
-
-    /**
-     * Synchronize data with other workers.
-     */
-    private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>> 
dataSynchronizer;
-
-    private final String offsetManagePrefix = "OffsetManage";
-
-    /**
-     * Listeners.
-     */
-    private Set<PositionUpdateListener> offsetUpdateListener;
-
-    public OffsetManagementServiceImpl(ConnectConfig connectConfig) {
-
-        this.offsetStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
-                new RecordPartitionConverter(),
-                new RecordOffsetConverter());
-        this.dataSynchronizer = new BrokerBasedLog(connectConfig,
-                connectConfig.getOffsetStoreTopic(),
-                ConnectUtil.createGroupName(offsetManagePrefix, 
connectConfig.getWorkerId()),
-                new OffsetChangeCallback(),
-                new JsonConverter(),
-                new RecordPositionMapConverter());
-        this.offsetUpdateListener = new HashSet<>();
-        this.needSyncPartition = new ConcurrentSet<>();
-        this.prepare(connectConfig);
-    }
-
-    /**
-     * Preparation before startup
-     *
-     * @param connectConfig
-     */
-    private void prepare(ConnectConfig connectConfig) {
-        String offsetStoreTopic = connectConfig.getOffsetStoreTopic();
-        if (!ConnectUtil.isTopicExist(connectConfig, offsetStoreTopic)) {
-            log.info("try to create offset store topic: {}!", 
offsetStoreTopic);
-            TopicConfig topicConfig = new TopicConfig(offsetStoreTopic, 1, 1, 
6);
-            ConnectUtil.createTopic(connectConfig, topicConfig);
-        }
-    }
-
-    @Override
-    public void start() {
-
-        offsetStore.load();
-        dataSynchronizer.start();
-        sendOnlineOffsetInfo();
-    }
-
-    @Override
-    public void stop() {
-
-        sendNeedSynchronizeOffset();
-        offsetStore.persist();
-        dataSynchronizer.stop();
-    }
-
-    @Override
-    public void persist() {
-
-        offsetStore.persist();
-    }
-
-    @Override
-    public void load() {
-        offsetStore.load();
-    }
-
-    @Override
-    public void synchronize() {
-
-        sendNeedSynchronizeOffset();
-    }
-
-    @Override
-    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
-
-        return offsetStore.getKVMap();
-    }
-
-    @Override
-    public RecordOffset getPosition(ExtendRecordPartition partition) {
-
-        return offsetStore.get(partition);
-    }
-
-    @Override
-    public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
-
-        offsetStore.putAll(offsets);
-        needSyncPartition.addAll(offsets.keySet());
-    }
-
-    @Override
-    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
-
-        offsetStore.put(partition, position);
-        needSyncPartition.add(partition);
-    }
-
-    @Override
-    public void removePosition(List<ExtendRecordPartition> offsets) {
-
-        if (null == offsets) {
-            return;
-        }
-        for (ExtendRecordPartition offset : offsets) {
-            needSyncPartition.remove(offset);
-            offsetStore.remove(offset);
-        }
-    }
-
-    @Override
-    public void registerListener(PositionUpdateListener listener) {
-
-        this.offsetUpdateListener.add(listener);
-    }
-
-    private void sendOnlineOffsetInfo() {
-
-        dataSynchronizer.send(OffsetChangeEnum.ONLINE_KEY.name(), 
offsetStore.getKVMap());
-    }
-
-
-    private void sendNeedSynchronizeOffset() {
-
-        Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
-        needSyncPartition = new ConcurrentSet<>();
-        Map<ExtendRecordPartition, RecordOffset> needSyncOffset = 
offsetStore.getKVMap().entrySet().stream()
-                .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
-                .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
-
-        dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(), 
needSyncOffset);
-    }
-
-    private void sendSynchronizeOffset() {
-
-        dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(), 
offsetStore.getKVMap());
-    }
-
-    private class OffsetChangeCallback implements 
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
-
-        @Override
-        public void onCompletion(Throwable error, String key, 
Map<ExtendRecordPartition, RecordOffset> result) {
-
-            boolean changed = false;
-            switch (OffsetChangeEnum.valueOf(key)) {
-                case ONLINE_KEY:
-                    changed = true;
-                    sendSynchronizeOffset();
-                    break;
-                case OFFSET_CHANG_KEY:
-                    changed = mergeOffsetInfo(result);
-                    break;
-                default:
-                    break;
-            }
-            if (changed) {
-                triggerListener();
-            }
-
-        }
-    }
-
-    private void triggerListener() {
-        for (PositionUpdateListener offsetUpdateListener : 
offsetUpdateListener) {
-            offsetUpdateListener.onPositionUpdate();
-        }
-    }
-
-    /**
-     * Merge new received offset info with local store.
-     *
-     * @param result
-     * @return
-     */
-    private boolean mergeOffsetInfo(Map<ExtendRecordPartition, RecordOffset> 
result) {
-
-        boolean changed = false;
-        if (null == result || 0 == result.size()) {
-            return changed;
-        }
-
-        for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry : 
result.entrySet()) {
-            boolean find = false;
-            for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry : 
offsetStore.getKVMap().entrySet()) {
-                if (newEntry.getKey().equals(existedEntry.getKey())) {
-                    find = true;
-                    if (!newEntry.getValue().equals(existedEntry.getValue())) {
-                        changed = true;
-                        existedEntry.setValue(newEntry.getValue());
-                    }
-                    break;
-                }
-            }
-            if (!find) {
-                offsetStore.put(newEntry.getKey(), newEntry.getValue());
-            }
-        }
-        return changed;
-    }
-
-    private enum OffsetChangeEnum {
-
-        /**
-         * Insert or update offset info.
-         */
-        OFFSET_CHANG_KEY,
-
-        /**
-         * A worker online.
-         */
-        ONLINE_KEY
-    }
-}
-
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
index bd3faa7..3d1a4b0 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
@@ -19,11 +19,10 @@
 package org.apache.rocketmq.connect.runtime.service;
 
 import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 
 /**
  * Interface for position manager.
@@ -94,6 +93,8 @@ public interface PositionManagementService {
      */
     void registerListener(PositionUpdateListener listener);
 
+    void initialize(ConnectConfig connectConfig);
+
     interface PositionUpdateListener {
 
         /**
@@ -101,4 +102,6 @@ public interface PositionManagementService {
          */
         void onPositionUpdate();
     }
+
+    StagingMode getStagingMode();
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 893809e..11e35e5 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -69,20 +69,7 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
 
     private final String positionManagePrefix = "PositionManage";
 
-    public PositionManagementServiceImpl(ConnectConfig connectConfig) {
-
-        this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
-                new RecordPartitionConverter(),
-                new RecordOffsetConverter());
-        this.dataSynchronizer = new BrokerBasedLog(connectConfig,
-                connectConfig.getPositionStoreTopic(),
-                ConnectUtil.createGroupName(positionManagePrefix, 
connectConfig.getWorkerId()),
-                new PositionChangeCallback(),
-                new JsonConverter(),
-                new RecordPositionMapConverter());
-        this.positionUpdateListener = new HashSet<>();
-        this.needSyncPartition = new ConcurrentSet<>();
-        this.prepare(connectConfig);
+    public PositionManagementServiceImpl() {
     }
 
     /**
@@ -177,6 +164,25 @@ public class PositionManagementServiceImpl implements 
PositionManagementService
         this.positionUpdateListener.add(listener);
     }
 
+    @Override public void initialize(ConnectConfig connectConfig) {
+        this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
+            new RecordPartitionConverter(),
+            new RecordOffsetConverter());
+        this.dataSynchronizer = new BrokerBasedLog(connectConfig,
+            connectConfig.getPositionStoreTopic(),
+            ConnectUtil.createGroupName(positionManagePrefix, 
connectConfig.getWorkerId()),
+            new PositionChangeCallback(),
+            new JsonConverter(),
+            new RecordPositionMapConverter());
+        this.positionUpdateListener = new HashSet<>();
+        this.needSyncPartition = new ConcurrentSet<>();
+        this.prepare(connectConfig);
+    }
+
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.DISTRIBUTED;
+    }
+
     private void sendOnlinePositionInfo() {
 
         dataSynchronizer.send(PositionChangeEnum.ONLINE_KEY.name(), 
positionStore.getKVMap());
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
new file mode 100644
index 0000000..2def5d5
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+public enum StagingMode {
+    STANDALONE,
+    DISTRIBUTED,
+    UNIVERSAL
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
deleted file mode 100644
index f445d58..0000000
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.service.memory;
-
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.rocketmq.common.utils.ThreadUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import 
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * memory offset management service impl
- */
-public class FileOffsetManagementServiceImpl implements 
PositionManagementService {
-    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
-    protected ExecutorService executor;
-
-    /**
-     * Current offset info in store.
-     */
-    private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
-
-
-    /**
-     * Listeners.
-     */
-    private PositionUpdateListener offsetUpdateListener;
-
-    public FileOffsetManagementServiceImpl(ConnectConfig connectConfig) {
-        this.offsetStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
-                new RecordPartitionConverter(),
-                new RecordOffsetConverter());
-    }
-
-    @Override
-    public void start() {
-        executor = Executors.newFixedThreadPool(1, 
ThreadUtils.newThreadFactory(
-                this.getClass().getSimpleName() + "-%d", false));
-        offsetStore.load();
-    }
-
-    @Override
-    public void stop() {
-        offsetStore.persist();
-        if (executor != null) {
-            executor.shutdown();
-            // Best effort wait for any get() and set() tasks (and caller's 
callbacks) to complete.
-            try {
-                executor.awaitTermination(30, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-            if (!executor.shutdownNow().isEmpty()) {
-                throw new ConnectException("Failed to stop 
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
-                        "shutting down pending tasks and/or callbacks.");
-            }
-            executor = null;
-        }
-    }
-
-    @Override
-    public void persist() {
-        offsetStore.persist();
-    }
-
-    @Override
-    public void load() {
-        offsetStore.load();
-    }
-
-    @Override
-    public void synchronize() {
-    }
-
-    @Override
-    public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
-        return offsetStore.getKVMap();
-    }
-
-    @Override
-    public RecordOffset getPosition(ExtendRecordPartition partition) {
-        return offsetStore.get(partition);
-    }
-
-    @Override
-    public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
-        offsetStore.putAll(offsets);
-        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void key, Void result) {
-                if (error != null) {
-                    log.error("Failed to persist offsets to storage: {}", 
error);
-                } else {
-                    log.trace("Successed to persist offsets to storage: {}", 
offsets);
-                }
-            }
-        });
-    }
-
-    @Override
-    public void putPosition(ExtendRecordPartition partition, RecordOffset 
position) {
-        offsetStore.put(partition, position);
-        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void key, Void result) {
-                if (error != null) {
-                    log.error("Failed to persist offsets to storage: {}", 
error);
-                } else {
-                    log.trace("Successed to persist offsets to storage: {}, 
{}", partition, position);
-                }
-            }
-        });
-    }
-
-    @Override
-    public void removePosition(List<ExtendRecordPartition> offsets) {
-        if (null == offsets) {
-            return;
-        }
-        for (ExtendRecordPartition offset : offsets) {
-            offsetStore.remove(offset);
-        }
-        this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void key, Void result) {
-                if (error != null) {
-                    log.error("Failed to persist offsets to storage: {}", 
error);
-                } else {
-                    log.trace("Successed to persist offsets to storage: {}", 
offsets);
-                }
-            }
-        });
-    }
-
-    @Override
-    public void registerListener(PositionUpdateListener listener) {
-        this.offsetUpdateListener = listener;
-    }
-
-
-    private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void> 
callback) {
-        if (offsetUpdateListener != null) {
-            offsetUpdateListener.onPositionUpdate();
-        }
-
-        return executor.submit(new Callable<Void>() {
-            /**
-             * Computes a result, or throws an exception if unable to do so.
-             *
-             * @return computed result
-             * @throws Exception if unable to compute a result
-             */
-            @Override
-            public Void call() {
-                try {
-                    offsetStore.persist();
-                    if (callback != null) {
-                        callback.onCompletion(null, null, null);
-                    }
-                } catch (Exception error) {
-                    callback.onCompletion(error, null, null);
-                }
-                return null;
-            }
-        });
-    }
-
-}
-
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
index c0ebb2c..056773b 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
@@ -56,18 +57,24 @@ public class FilePositionManagementServiceImpl implements 
PositionManagementServ
      */
     private PositionUpdateListener positionUpdateListener;
 
+    public FilePositionManagementServiceImpl() {
 
-    public FilePositionManagementServiceImpl(ConnectConfig connectConfig) {
+    }
+
+    @Override public void initialize(ConnectConfig connectConfig) {
         this.positionStore = new 
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
-                new RecordPartitionConverter(),
-                new RecordOffsetConverter());
+            new RecordPartitionConverter(),
+            new RecordOffsetConverter());
     }
 
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.STANDALONE;
+    }
 
     @Override
     public void start() {
         executor = Executors.newFixedThreadPool(1, 
ThreadUtils.newThreadFactory(
-                this.getClass().getSimpleName() + "-%d", false));
+            this.getClass().getSimpleName() + "-%d", false));
         positionStore.load();
     }
 
@@ -84,7 +91,7 @@ public class FilePositionManagementServiceImpl implements 
PositionManagementServ
             }
             if (!executor.shutdownNow().isEmpty()) {
                 throw new ConnectException("Failed to stop 
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
-                        "shutting down pending tasks and/or callbacks.");
+                    "shutting down pending tasks and/or callbacks.");
             }
             executor = null;
         }
@@ -196,6 +203,5 @@ public class FilePositionManagementServiceImpl implements 
PositionManagementServ
         });
     }
 
-
 }
 
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
index 5e14d70..8add05b 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
 
 import java.util.Collections;
 import java.util.List;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
 
 /**
  * standalone cluster management service
@@ -31,8 +32,12 @@ public class MemoryClusterManagementServiceImpl implements 
ClusterManagementServ
 
     private StandaloneConfig config;
 
-    public MemoryClusterManagementServiceImpl(ConnectConfig config) {
-        this.configure(config);
+    public MemoryClusterManagementServiceImpl() {
+
+    }
+
+    @Override public void initialize(ConnectConfig connectConfig) {
+        this.configure(connectConfig);
     }
 
     /**
@@ -94,4 +99,8 @@ public class MemoryClusterManagementServiceImpl implements 
ClusterManagementServ
     public String getCurrentWorker() {
         return this.config.getWorkerId();
     }
+
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.STANDALONE;
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 5e8bb5b..5f763ab 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.ListConverter;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
@@ -61,18 +62,20 @@ public class MemoryConfigManagementServiceImpl implements 
ConfigManagementServic
      */
     private ConnectorConfigUpdateListener connectorConfigUpdateListener;
 
-    private final Plugin plugin;
+    private Plugin plugin;
 
-    public MemoryConfigManagementServiceImpl(ConnectConfig connectConfig, 
Plugin plugin) {
+    public MemoryConfigManagementServiceImpl() {
+    }
 
+    @Override public void initialize(ConnectConfig connectConfig, Plugin 
plugin) {
         this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
-                new JsonConverter(),
-                new JsonConverter(ConnectKeyValue.class));
+            
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+            new JsonConverter(),
+            new JsonConverter(ConnectKeyValue.class));
         this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-                
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
-                new JsonConverter(),
-                new ListConverter(ConnectKeyValue.class));
+            
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+            new JsonConverter(),
+            new ListConverter(ConnectKeyValue.class));
         this.plugin = plugin;
     }
 
@@ -251,4 +254,8 @@ public class MemoryConfigManagementServiceImpl implements 
ConfigManagementServic
     public Plugin getPlugin() {
         return this.plugin;
     }
+
+    @Override public StagingMode getStagingMode() {
+        return StagingMode.STANDALONE;
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
index ce9b210..2629ce2 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
@@ -92,4 +92,15 @@ public interface KeyValueStore<K, V> {
      * Persist all data into the store.
      */
     void persist();
+
+
+    Stage getStage();
+
+
+    enum Stage {
+        CONNECTOR,
+        TASK,
+        POSITION,
+        UNIVERSAL
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
index ea8a78b..c474985 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
@@ -72,4 +72,8 @@ public class MemoryBasedKeyValueStore<K, V> implements 
KeyValueStore<K, V> {
     public void persist() {
 
     }
+
+    @Override public Stage getStage() {
+        return Stage.UNIVERSAL;
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
new file mode 100644
index 0000000..db9e452
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.jetbrains.annotations.NotNull;
+
+public class ServiceProviderUtil {
+
+    @NotNull public static ClusterManagementService 
getClusterManagementServices(StagingMode stagingMode) {
+        ClusterManagementService clusterManagementService = null;
+        ClusterManagementService universalClusterManagementService = null;
+        ServiceLoader<ClusterManagementService> 
clusterManagementServiceServiceLoader = 
ServiceLoader.load(ClusterManagementService.class);
+        Iterator<ClusterManagementService> clusterManagementServiceIterator = 
clusterManagementServiceServiceLoader.iterator();
+        while (clusterManagementServiceIterator.hasNext()) {
+            ClusterManagementService clusterManagementService1 = 
clusterManagementServiceIterator.next();
+            if (clusterManagementService1.getStagingMode() == stagingMode) {
+                clusterManagementService = clusterManagementService1;
+                break;
+            }
+            if (clusterManagementService1.getStagingMode() == 
StagingMode.UNIVERSAL) {
+                universalClusterManagementService = clusterManagementService1;
+                break;
+            }
+        }
+        if (null == clusterManagementService) {
+            clusterManagementService = universalClusterManagementService;
+        }
+        return clusterManagementService;
+    }
+
+    @NotNull public static ConfigManagementService 
getConfigManagementServices(StagingMode stagingMode) {
+        ConfigManagementService configManagementService = null;
+        ConfigManagementService universalConfigManagementService = null;
+        ServiceLoader<ConfigManagementService> 
configManagementServiceServiceLoader = 
ServiceLoader.load(ConfigManagementService.class);
+        Iterator<ConfigManagementService> configManagementServiceIterator = 
configManagementServiceServiceLoader.iterator();
+        while (configManagementServiceIterator.hasNext()) {
+            ConfigManagementService configManagementService1 = 
configManagementServiceIterator.next();
+            if (configManagementService1.getStagingMode() == stagingMode) {
+                configManagementService = configManagementService1;
+                break;
+            }
+            if (configManagementService1.getStagingMode() == 
StagingMode.UNIVERSAL) {
+                universalConfigManagementService = configManagementService1;
+                break;
+            }
+        }
+        if (null == configManagementService) {
+            configManagementService = universalConfigManagementService;
+        }
+        return configManagementService;
+    }
+
+    @NotNull public static PositionManagementService 
getPositionManagementServices(StagingMode stagingMode) {
+        PositionManagementService positionManagementService = null;
+        PositionManagementService universalPositionManagementService = null;
+        ServiceLoader<PositionManagementService> 
positionManagementServiceServiceLoader = 
ServiceLoader.load(PositionManagementService.class);
+        Iterator<PositionManagementService> positionManagementServiceIterator 
= positionManagementServiceServiceLoader.iterator();
+        while (positionManagementServiceIterator.hasNext()) {
+            PositionManagementService positionManagementService1 = 
positionManagementServiceIterator.next();
+            if (positionManagementService1.getStagingMode() == stagingMode) {
+                positionManagementService = positionManagementService1;
+                break;
+            }
+            if (positionManagementService1.getStagingMode() == 
StagingMode.UNIVERSAL) {
+                universalPositionManagementService = 
positionManagementService1;
+                break;
+            }
+        }
+        if (null == positionManagementService) {
+            positionManagementService = universalPositionManagementService;
+        }
+        return positionManagementService;
+    }
+}
+
diff --git 
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
new file mode 100644
index 0000000..c977a12
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl
\ No newline at end of file
diff --git 
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
new file mode 100644
index 0000000..7ad0adb
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl
\ No newline at end of file
diff --git 
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
new file mode 100644
index 0000000..77cbdb7
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl
\ No newline at end of file
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
index 10884df..ecc9e25 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
@@ -18,12 +18,12 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
 
 import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
 
 public class TestPositionManageServiceImpl implements 
PositionManagementService {
 
@@ -75,4 +75,12 @@ public class TestPositionManageServiceImpl implements 
PositionManagementService
     public void registerListener(PositionUpdateListener listener) {
 
     }
+
+    @Override public void initialize(ConnectConfig connectConfig) {
+
+    }
+
+    @Override public StagingMode getStagingMode() {
+        return null;
+    }
 }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index 4af8f61..03b5e01 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -122,8 +122,8 @@ public class ConfigManagementServiceImplTest {
             }
         }).when(producer).send(any(Message.class), any(SendCallback.class));
 
-        configManagementService = new 
ConfigManagementServiceImpl(connectConfig, plugin);
-
+        configManagementService = new ConfigManagementServiceImpl();
+        configManagementService.initialize(connectConfig, plugin);
         final Field connectorKeyValueStoreField = 
ConfigManagementServiceImpl.class.getDeclaredField("connectorKeyValueStore");
         connectorKeyValueStoreField.setAccessible(true);
         connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>) 
connectorKeyValueStoreField.get(configManagementService);
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index 6aa55b8..990ec8c 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -116,7 +116,8 @@ public class PositionManagementServiceImplTest {
             }
         }).when(producer).send(any(Message.class), any(SendCallback.class));
 
-        positionManagementService = new 
PositionManagementServiceImpl(connectConfig);
+        positionManagementService = new PositionManagementServiceImpl();
+        positionManagementService.initialize(connectConfig);
 
         final Field dataSynchronizerField = 
PositionManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
         dataSynchronizerField.setAccessible(true);

Reply via email to