This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new ed4b4fc [ISSUES #91] runtime service support spi (#92)
ed4b4fc is described below
commit ed4b4fcb56bd6937a07adc68c53bb72f5a67c373
Author: zhoubo <[email protected]>
AuthorDate: Fri Jul 1 14:49:18 2022 +0800
[ISSUES #91] runtime service support spi (#92)
* cluster service support spi
* remove offsetManagementService
* config position service support spi
* connect service support spi
---
.../connect/runtime/ConnectController.java | 0
.../connect/runtime/DistributedConnectStartup.java | 22 +-
.../connect/runtime/StandaloneConnectStartup.java | 30 ++-
.../runtime/connectorwrapper/WorkerDirectTask.java | 12 +-
.../controller/AbstractConnectController.java | 14 +-
.../distributed/DistributedConnectController.java | 14 +-
.../standalone/StandaloneConnectController.java | 5 +-
.../runtime/service/ClusterManagementService.java | 7 +-
.../service/ClusterManagementServiceImpl.java | 21 +-
.../runtime/service/ConfigManagementService.java | 4 +
.../service/ConfigManagementServiceImpl.java | 28 ++-
.../service/OffsetManagementServiceImpl.java | 277 ---------------------
.../runtime/service/PositionManagementService.java | 9 +-
.../service/PositionManagementServiceImpl.java | 34 +--
.../connect/runtime/service/StagingMode.java | 24 ++
.../memory/FileOffsetManagementServiceImpl.java | 204 ---------------
.../memory/FilePositionManagementServiceImpl.java | 18 +-
.../memory/MemoryClusterManagementServiceImpl.java | 13 +-
.../memory/MemoryConfigManagementServiceImpl.java | 23 +-
.../connect/runtime/store/KeyValueStore.java | 11 +
.../runtime/store/MemoryBasedKeyValueStore.java | 4 +
.../connect/runtime/utils/ServiceProviderUtil.java | 96 +++++++
...onnect.runtime.service.ClusterManagementService | 2 +
...connect.runtime.service.ConfigManagementService | 2 +
...nnect.runtime.service.PositionManagementService | 2 +
.../testimpl/TestPositionManageServiceImpl.java | 16 +-
.../service/ConfigManagementServiceImplTest.java | 4 +-
.../service/PositionManagementServiceImplTest.java | 3 +-
28 files changed, 317 insertions(+), 582 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
new file mode 100644
index 0000000..e69de29
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index f3310a8..2f0dcda 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -35,13 +35,14 @@ import
org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConfig;
import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
-import
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
-import
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
+import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,14 +132,19 @@ public class DistributedConnectStartup {
plugin.initPlugin();
// Create controller and initialize.
+ ClusterManagementService clusterManagementService =
ServiceProviderUtil.getClusterManagementServices(StagingMode.DISTRIBUTED);
+ clusterManagementService.initialize(connectConfig);
+ ConfigManagementService configManagementService =
ServiceProviderUtil.getConfigManagementServices(StagingMode.DISTRIBUTED);
+ configManagementService.initialize(connectConfig, plugin);
+ PositionManagementService positionManagementServices =
ServiceProviderUtil.getPositionManagementServices(StagingMode.DISTRIBUTED);
+ positionManagementServices.initialize(connectConfig);
DistributedConnectController controller = new
DistributedConnectController(
plugin,
connectConfig,
- new ClusterManagementServiceImpl(connectConfig),
- new ConfigManagementServiceImpl(connectConfig, plugin),
- new PositionManagementServiceImpl(connectConfig),
- new OffsetManagementServiceImpl(connectConfig));
+ clusterManagementService,
+ configManagementService,
+ positionManagementServices);
// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index e803864..2770265 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -35,13 +35,14 @@ import
org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
import
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
-import
org.apache.rocketmq.connect.runtime.service.memory.FileOffsetManagementServiceImpl;
-import
org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl;
-import
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
-import
org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
+import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,7 +87,7 @@ public class StandaloneConnectStartup {
// Build the command line options.
Options options = ServerUtil.buildCommandlineOptions(new
Options());
commandLine = ServerUtil.parseCmdLine("connect", args,
buildCommandlineOptions(options),
- new PosixParser());
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
@@ -116,7 +117,6 @@ public class StandaloneConnectStartup {
lc.reset();
configurator.doConfigure(connectConfig.getConnectHome() +
"/conf/logback.xml");
-
List<String> pluginPaths = new ArrayList<>(16);
if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
String[] strArr = connectConfig.getPluginPaths().split(",");
@@ -129,15 +129,19 @@ public class StandaloneConnectStartup {
Plugin plugin = new Plugin(pluginPaths);
plugin.initPlugin();
- // Create controller and initialize.
+ ClusterManagementService clusterManagementService =
ServiceProviderUtil.getClusterManagementServices(StagingMode.STANDALONE);
+ clusterManagementService.initialize(connectConfig);
+ ConfigManagementService configManagementService =
ServiceProviderUtil.getConfigManagementServices(StagingMode.STANDALONE);
+ configManagementService.initialize(connectConfig, plugin);
+ PositionManagementService positionManagementServices =
ServiceProviderUtil.getPositionManagementServices(StagingMode.STANDALONE);
+ positionManagementServices.initialize(connectConfig);
StandaloneConnectController controller = new
StandaloneConnectController(
- plugin,
- connectConfig,
- new MemoryClusterManagementServiceImpl(connectConfig),
- new MemoryConfigManagementServiceImpl(connectConfig,
plugin),
- new FilePositionManagementServiceImpl(connectConfig),
- new FileOffsetManagementServiceImpl(connectConfig));
+ plugin,
+ connectConfig,
+ clusterManagementService,
+ configManagementService,
+ positionManagementServices);
// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 992c071..de3d25b 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -179,6 +179,10 @@ public class WorkerDirectTask implements WorkerTask {
return taskConfig;
}
+ @Override public KeyValue configs() {
+ return taskConfig;
+ }
+
@Override
public void resetOffset(RecordPartition recordPartition,
RecordOffset recordOffset) {
@@ -219,11 +223,15 @@ public class WorkerDirectTask implements WorkerTask {
}
@Override public String getConnectorName() {
- return null;
+ return taskConfig.getString(RuntimeConfigDefine.CONNECTOR_ID);
}
@Override public String getTaskName() {
- return null;
+ return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
+ }
+
+ @Override public KeyValue configs() {
+ return taskConfig;
}
/**
* Get the configurations of current task.
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 73c259d..84128ae 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -53,11 +53,6 @@ public abstract class AbstractConnectController implements
ConnectController {
*/
protected final PositionManagementService positionManagementService;
- /**
- * Offset management of sink tasks.
- */
- protected final PositionManagementService offsetManagementService;
-
/**
* Manage the online info of the cluster.
*/
@@ -90,8 +85,7 @@ public abstract class AbstractConnectController implements
ConnectController {
ConnectConfig connectConfig,
ClusterManagementService clusterManagementService,
ConfigManagementService configManagementService,
- PositionManagementService positionManagementService,
- PositionManagementService offsetManagementService
+ PositionManagementService positionManagementService
) {
// set config
this.connectConfig = connectConfig;
@@ -104,7 +98,6 @@ public abstract class AbstractConnectController implements
ConnectController {
this.clusterManagementService = clusterManagementService;
this.configManagementService = configManagementService;
this.positionManagementService = positionManagementService;
- this.offsetManagementService = offsetManagementService;
this.worker = new Worker(connectConfig, positionManagementService,
configManagementService, plugin, this);
this.restHandler = new RestHandler(this);
}
@@ -115,7 +108,6 @@ public abstract class AbstractConnectController implements
ConnectController {
clusterManagementService.start();
configManagementService.start();
positionManagementService.start();
- offsetManagementService.start();
worker.start();
connectStatsService.start();
}
@@ -135,10 +127,6 @@ public abstract class AbstractConnectController implements
ConnectController {
positionManagementService.stop();
}
- if (offsetManagementService != null) {
- offsetManagementService.stop();
- }
-
if (clusterManagementService != null) {
clusterManagementService.stop();
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
index 0dd048a..403cf30 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -57,10 +57,9 @@ public class DistributedConnectController extends
AbstractConnectController {
DistributedConfig connectConfig,
ClusterManagementService
clusterManagementService,
ConfigManagementService
configManagementService,
- PositionManagementService
positionManagementService,
- PositionManagementService
offsetManagementService) {
+ PositionManagementService
positionManagementService) {
- super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService, offsetManagementService);
+ super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService);
AllocateConnAndTaskStrategy strategy =
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
this.rebalanceImpl = new RebalanceImpl(worker,
configManagementService, clusterManagementService, strategy, this);
this.rebalanceService = new RebalanceService(rebalanceImpl,
configManagementService, clusterManagementService);
@@ -91,15 +90,6 @@ public class DistributedConnectController extends
AbstractConnectController {
}, 1000, this.connectConfig.getPositionPersistInterval(),
TimeUnit.MILLISECONDS);
- // Persist offset information of sink tasks.
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- try {
- this.offsetManagementService.persist();
- } catch (Exception e) {
- log.error("schedule persist offset error.", e);
- }
- }, 1000, this.connectConfig.getOffsetPersistInterval(),
TimeUnit.MILLISECONDS);
-
}
@Override
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
index 45b6061..feb2bb9 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -39,10 +39,9 @@ public class StandaloneConnectController extends
AbstractConnectController {
StandaloneConfig connectConfig,
ClusterManagementService
clusterManagementService,
ConfigManagementService
configManagementService,
- PositionManagementService
positionManagementService,
- PositionManagementService
offsetManagementService) {
+ PositionManagementService
positionManagementService) {
- super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService, offsetManagementService);
+ super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService);
AllocateConnAndTaskStrategy strategy =
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
this.rebalanceImpl = new RebalanceImpl(worker,
configManagementService, clusterManagementService, strategy, this);
this.rebalanceService = new StandaloneRebalanceService(rebalanceImpl,
configManagementService, clusterManagementService);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 9a07d56..8792593 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -17,9 +17,8 @@
package org.apache.rocketmq.connect.runtime.service;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-
import java.util.List;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
/**
* Interface for cluster management.
@@ -28,6 +27,8 @@ public interface ClusterManagementService {
Long WORKER_TIME_OUT = 30 * 1000L;
+ void initialize(ConnectConfig connectConfig);
+
/**
* Start the cluster manager.
*/
@@ -70,6 +71,8 @@ public interface ClusterManagementService {
String getCurrentWorker();
+ StagingMode getStagingMode();
+
interface WorkerStatusListener {
/**
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index bebc338..2d97737 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -45,19 +45,14 @@ public class ClusterManagementServiceImpl implements
ClusterManagementService {
/**
* Configs of current worker.
*/
- private final ConnectConfig connectConfig;
+ private ConnectConfig connectConfig;
/**
* Used for worker discovery
*/
private DefaultMQPullConsumer defaultMQPullConsumer;
- public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
- this.connectConfig = connectConfig;
- this.workerStatusListeners = new HashSet<>();
- this.defaultMQPullConsumer =
ConnectUtil.initDefaultMQPullConsumer(connectConfig);
-
this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
- this.prepare(connectConfig);
+ public ClusterManagementServiceImpl() {
}
/**
@@ -81,6 +76,14 @@ public class ClusterManagementServiceImpl implements
ClusterManagementService {
}
+ @Override public void initialize(ConnectConfig connectConfig) {
+ this.connectConfig = connectConfig;
+ this.workerStatusListeners = new HashSet<>();
+ this.defaultMQPullConsumer =
ConnectUtil.initDefaultMQPullConsumer(connectConfig);
+
this.defaultMQPullConsumer.setConsumerGroup(connectConfig.getConnectClusterId());
+ this.prepare(connectConfig);
+ }
+
@Override
public void start() {
try {
@@ -126,6 +129,10 @@ public class ClusterManagementServiceImpl implements
ClusterManagementService {
return
this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().getClientId();
}
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.DISTRIBUTED;
+ }
+
@Override
public void registerListener(WorkerStatusListener listener) {
this.workerStatusListeners.add(listener);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 5d6d6c9..60343e8 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -104,6 +104,8 @@ public interface ConfigManagementService {
*/
void registerListener(ConnectorConfigUpdateListener listener);
+ void initialize(ConnectConfig connectConfig, Plugin plugin);
+
interface ConnectorConfigUpdateListener {
/**
@@ -113,4 +115,6 @@ public interface ConfigManagementService {
}
Plugin getPlugin();
+
+ StagingMode getStagingMode();
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 1134e1d..0ee1655 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -69,10 +69,13 @@ public class ConfigManagementServiceImpl implements
ConfigManagementService {
*/
private DataSynchronizer<String, ConnAndTaskConfigs> dataSynchronizer;
- private final Plugin plugin;
+ private Plugin plugin;
private final String configManagePrefix = "ConfigManage";
+ public ConfigManagementServiceImpl() {
+ }
+
public ConfigManagementServiceImpl(ConnectConfig connectConfig, Plugin
plugin) {
this.connectorConfigUpdateListener = new HashSet<>();
@@ -279,6 +282,25 @@ public class ConfigManagementServiceImpl implements
ConfigManagementService {
this.connectorConfigUpdateListener.add(listener);
}
+ @Override public void initialize(ConnectConfig connectConfig, Plugin
plugin) {
+ this.connectorConfigUpdateListener = new HashSet<>();
+ this.dataSynchronizer = new BrokerBasedLog<>(connectConfig,
+ connectConfig.getConfigStoreTopic(),
+ ConnectUtil.createGroupName(configManagePrefix,
connectConfig.getWorkerId()),
+ new ConfigChangeCallback(),
+ new JsonConverter(),
+ new ConnAndTaskConfigConverter());
+ this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
+
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new JsonConverter(ConnectKeyValue.class));
+ this.taskKeyValueStore = new FileBaseKeyValueStore<>(
+
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new ListConverter(ConnectKeyValue.class));
+ this.plugin = plugin;
+ }
+
private void triggerListener() {
if (null == this.connectorConfigUpdateListener) {
@@ -375,4 +397,8 @@ public class ConfigManagementServiceImpl implements
ConfigManagementService {
public Plugin getPlugin() {
return this.plugin;
}
+
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.DISTRIBUTED;
+ }
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
deleted file mode 100644
index 49860be..0000000
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.service;
-
-import io.netty.util.internal.ConcurrentSet;
-import io.openmessaging.connector.api.data.RecordOffset;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
-import
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
-import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
-import
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OffsetManagementServiceImpl implements PositionManagementService {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- /**
- * Current offset info in store.
- */
- private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
-
-
- /**
- * The updated partition of the task in the current instance.
- */
- private Set<ExtendRecordPartition> needSyncPartition;
-
- /**
- * Synchronize data with other workers.
- */
- private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>>
dataSynchronizer;
-
- private final String offsetManagePrefix = "OffsetManage";
-
- /**
- * Listeners.
- */
- private Set<PositionUpdateListener> offsetUpdateListener;
-
- public OffsetManagementServiceImpl(ConnectConfig connectConfig) {
-
- this.offsetStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
- this.dataSynchronizer = new BrokerBasedLog(connectConfig,
- connectConfig.getOffsetStoreTopic(),
- ConnectUtil.createGroupName(offsetManagePrefix,
connectConfig.getWorkerId()),
- new OffsetChangeCallback(),
- new JsonConverter(),
- new RecordPositionMapConverter());
- this.offsetUpdateListener = new HashSet<>();
- this.needSyncPartition = new ConcurrentSet<>();
- this.prepare(connectConfig);
- }
-
- /**
- * Preparation before startup
- *
- * @param connectConfig
- */
- private void prepare(ConnectConfig connectConfig) {
- String offsetStoreTopic = connectConfig.getOffsetStoreTopic();
- if (!ConnectUtil.isTopicExist(connectConfig, offsetStoreTopic)) {
- log.info("try to create offset store topic: {}!",
offsetStoreTopic);
- TopicConfig topicConfig = new TopicConfig(offsetStoreTopic, 1, 1,
6);
- ConnectUtil.createTopic(connectConfig, topicConfig);
- }
- }
-
- @Override
- public void start() {
-
- offsetStore.load();
- dataSynchronizer.start();
- sendOnlineOffsetInfo();
- }
-
- @Override
- public void stop() {
-
- sendNeedSynchronizeOffset();
- offsetStore.persist();
- dataSynchronizer.stop();
- }
-
- @Override
- public void persist() {
-
- offsetStore.persist();
- }
-
- @Override
- public void load() {
- offsetStore.load();
- }
-
- @Override
- public void synchronize() {
-
- sendNeedSynchronizeOffset();
- }
-
- @Override
- public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
-
- return offsetStore.getKVMap();
- }
-
- @Override
- public RecordOffset getPosition(ExtendRecordPartition partition) {
-
- return offsetStore.get(partition);
- }
-
- @Override
- public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
-
- offsetStore.putAll(offsets);
- needSyncPartition.addAll(offsets.keySet());
- }
-
- @Override
- public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
-
- offsetStore.put(partition, position);
- needSyncPartition.add(partition);
- }
-
- @Override
- public void removePosition(List<ExtendRecordPartition> offsets) {
-
- if (null == offsets) {
- return;
- }
- for (ExtendRecordPartition offset : offsets) {
- needSyncPartition.remove(offset);
- offsetStore.remove(offset);
- }
- }
-
- @Override
- public void registerListener(PositionUpdateListener listener) {
-
- this.offsetUpdateListener.add(listener);
- }
-
- private void sendOnlineOffsetInfo() {
-
- dataSynchronizer.send(OffsetChangeEnum.ONLINE_KEY.name(),
offsetStore.getKVMap());
- }
-
-
- private void sendNeedSynchronizeOffset() {
-
- Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
- needSyncPartition = new ConcurrentSet<>();
- Map<ExtendRecordPartition, RecordOffset> needSyncOffset =
offsetStore.getKVMap().entrySet().stream()
- .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
- .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
-
- dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(),
needSyncOffset);
- }
-
- private void sendSynchronizeOffset() {
-
- dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(),
offsetStore.getKVMap());
- }
-
- private class OffsetChangeCallback implements
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
-
- @Override
- public void onCompletion(Throwable error, String key,
Map<ExtendRecordPartition, RecordOffset> result) {
-
- boolean changed = false;
- switch (OffsetChangeEnum.valueOf(key)) {
- case ONLINE_KEY:
- changed = true;
- sendSynchronizeOffset();
- break;
- case OFFSET_CHANG_KEY:
- changed = mergeOffsetInfo(result);
- break;
- default:
- break;
- }
- if (changed) {
- triggerListener();
- }
-
- }
- }
-
- private void triggerListener() {
- for (PositionUpdateListener offsetUpdateListener :
offsetUpdateListener) {
- offsetUpdateListener.onPositionUpdate();
- }
- }
-
- /**
- * Merge new received offset info with local store.
- *
- * @param result
- * @return
- */
- private boolean mergeOffsetInfo(Map<ExtendRecordPartition, RecordOffset>
result) {
-
- boolean changed = false;
- if (null == result || 0 == result.size()) {
- return changed;
- }
-
- for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry :
result.entrySet()) {
- boolean find = false;
- for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry :
offsetStore.getKVMap().entrySet()) {
- if (newEntry.getKey().equals(existedEntry.getKey())) {
- find = true;
- if (!newEntry.getValue().equals(existedEntry.getValue())) {
- changed = true;
- existedEntry.setValue(newEntry.getValue());
- }
- break;
- }
- }
- if (!find) {
- offsetStore.put(newEntry.getKey(), newEntry.getValue());
- }
- }
- return changed;
- }
-
- private enum OffsetChangeEnum {
-
- /**
- * Insert or update offset info.
- */
- OFFSET_CHANG_KEY,
-
- /**
- * A worker online.
- */
- ONLINE_KEY
- }
-}
-
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
index bd3faa7..3d1a4b0 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
@@ -19,11 +19,10 @@
package org.apache.rocketmq.connect.runtime.service;
import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
/**
* Interface for position manager.
@@ -94,6 +93,8 @@ public interface PositionManagementService {
*/
void registerListener(PositionUpdateListener listener);
+ void initialize(ConnectConfig connectConfig);
+
interface PositionUpdateListener {
/**
@@ -101,4 +102,6 @@ public interface PositionManagementService {
*/
void onPositionUpdate();
}
+
+ StagingMode getStagingMode();
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 893809e..11e35e5 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -69,20 +69,7 @@ public class PositionManagementServiceImpl implements
PositionManagementService
private final String positionManagePrefix = "PositionManage";
- public PositionManagementServiceImpl(ConnectConfig connectConfig) {
-
- this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
- this.dataSynchronizer = new BrokerBasedLog(connectConfig,
- connectConfig.getPositionStoreTopic(),
- ConnectUtil.createGroupName(positionManagePrefix,
connectConfig.getWorkerId()),
- new PositionChangeCallback(),
- new JsonConverter(),
- new RecordPositionMapConverter());
- this.positionUpdateListener = new HashSet<>();
- this.needSyncPartition = new ConcurrentSet<>();
- this.prepare(connectConfig);
+ public PositionManagementServiceImpl() {
}
/**
@@ -177,6 +164,25 @@ public class PositionManagementServiceImpl implements
PositionManagementService
this.positionUpdateListener.add(listener);
}
+ @Override public void initialize(ConnectConfig connectConfig) {
+ this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
+ this.dataSynchronizer = new BrokerBasedLog(connectConfig,
+ connectConfig.getPositionStoreTopic(),
+ ConnectUtil.createGroupName(positionManagePrefix,
connectConfig.getWorkerId()),
+ new PositionChangeCallback(),
+ new JsonConverter(),
+ new RecordPositionMapConverter());
+ this.positionUpdateListener = new HashSet<>();
+ this.needSyncPartition = new ConcurrentSet<>();
+ this.prepare(connectConfig);
+ }
+
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.DISTRIBUTED;
+ }
+
private void sendOnlinePositionInfo() {
dataSynchronizer.send(PositionChangeEnum.ONLINE_KEY.name(),
positionStore.getKVMap());
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
new file mode 100644
index 0000000..2def5d5
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/StagingMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+public enum StagingMode {
+ STANDALONE,
+ DISTRIBUTED,
+ UNIVERSAL
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
deleted file mode 100644
index f445d58..0000000
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime.service.memory;
-
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.rocketmq.common.utils.ThreadUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
-import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
-import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * memory offset management service impl
- */
-public class FileOffsetManagementServiceImpl implements
PositionManagementService {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- protected ExecutorService executor;
-
- /**
- * Current offset info in store.
- */
- private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
-
-
- /**
- * Listeners.
- */
- private PositionUpdateListener offsetUpdateListener;
-
- public FileOffsetManagementServiceImpl(ConnectConfig connectConfig) {
- this.offsetStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
- }
-
- @Override
- public void start() {
- executor = Executors.newFixedThreadPool(1,
ThreadUtils.newThreadFactory(
- this.getClass().getSimpleName() + "-%d", false));
- offsetStore.load();
- }
-
- @Override
- public void stop() {
- offsetStore.persist();
- if (executor != null) {
- executor.shutdown();
- // Best effort wait for any get() and set() tasks (and caller's
callbacks) to complete.
- try {
- executor.awaitTermination(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (!executor.shutdownNow().isEmpty()) {
- throw new ConnectException("Failed to stop
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
- "shutting down pending tasks and/or callbacks.");
- }
- executor = null;
- }
- }
-
- @Override
- public void persist() {
- offsetStore.persist();
- }
-
- @Override
- public void load() {
- offsetStore.load();
- }
-
- @Override
- public void synchronize() {
- }
-
- @Override
- public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
- return offsetStore.getKVMap();
- }
-
- @Override
- public RecordOffset getPosition(ExtendRecordPartition partition) {
- return offsetStore.get(partition);
- }
-
- @Override
- public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
- offsetStore.putAll(offsets);
- this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
- @Override
- public void onCompletion(Throwable error, Void key, Void result) {
- if (error != null) {
- log.error("Failed to persist offsets to storage: {}",
error);
- } else {
- log.trace("Successed to persist offsets to storage: {}",
offsets);
- }
- }
- });
- }
-
- @Override
- public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
- offsetStore.put(partition, position);
- this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
- @Override
- public void onCompletion(Throwable error, Void key, Void result) {
- if (error != null) {
- log.error("Failed to persist offsets to storage: {}",
error);
- } else {
- log.trace("Successed to persist offsets to storage: {},
{}", partition, position);
- }
- }
- });
- }
-
- @Override
- public void removePosition(List<ExtendRecordPartition> offsets) {
- if (null == offsets) {
- return;
- }
- for (ExtendRecordPartition offset : offsets) {
- offsetStore.remove(offset);
- }
- this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
- @Override
- public void onCompletion(Throwable error, Void key, Void result) {
- if (error != null) {
- log.error("Failed to persist offsets to storage: {}",
error);
- } else {
- log.trace("Successed to persist offsets to storage: {}",
offsets);
- }
- }
- });
- }
-
- @Override
- public void registerListener(PositionUpdateListener listener) {
- this.offsetUpdateListener = listener;
- }
-
-
- private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void>
callback) {
- if (offsetUpdateListener != null) {
- offsetUpdateListener.onPositionUpdate();
- }
-
- return executor.submit(new Callable<Void>() {
- /**
- * Computes a result, or throws an exception if unable to do so.
- *
- * @return computed result
- * @throws Exception if unable to compute a result
- */
- @Override
- public Void call() {
- try {
- offsetStore.persist();
- if (callback != null) {
- callback.onCompletion(null, null, null);
- }
- } catch (Exception error) {
- callback.onCompletion(error, null, null);
- }
- return null;
- }
- });
- }
-
-}
-
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
index c0ebb2c..056773b 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -24,6 +24,7 @@ import
org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
@@ -56,18 +57,24 @@ public class FilePositionManagementServiceImpl implements
PositionManagementServ
*/
private PositionUpdateListener positionUpdateListener;
+ public FilePositionManagementServiceImpl() {
- public FilePositionManagementServiceImpl(ConnectConfig connectConfig) {
+ }
+
+ @Override public void initialize(ConnectConfig connectConfig) {
this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
}
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.STANDALONE;
+ }
@Override
public void start() {
executor = Executors.newFixedThreadPool(1,
ThreadUtils.newThreadFactory(
- this.getClass().getSimpleName() + "-%d", false));
+ this.getClass().getSimpleName() + "-%d", false));
positionStore.load();
}
@@ -84,7 +91,7 @@ public class FilePositionManagementServiceImpl implements
PositionManagementServ
}
if (!executor.shutdownNow().isEmpty()) {
throw new ConnectException("Failed to stop
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
- "shutting down pending tasks and/or callbacks.");
+ "shutting down pending tasks and/or callbacks.");
}
executor = null;
}
@@ -196,6 +203,5 @@ public class FilePositionManagementServiceImpl implements
PositionManagementServ
});
}
-
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
index 5e14d70..8add05b 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
@@ -23,6 +23,7 @@ import
org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import java.util.Collections;
import java.util.List;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
/**
* standalone cluster management service
@@ -31,8 +32,12 @@ public class MemoryClusterManagementServiceImpl implements
ClusterManagementServ
private StandaloneConfig config;
- public MemoryClusterManagementServiceImpl(ConnectConfig config) {
- this.configure(config);
+ public MemoryClusterManagementServiceImpl() {
+
+ }
+
+ @Override public void initialize(ConnectConfig connectConfig) {
+ this.configure(connectConfig);
}
/**
@@ -94,4 +99,8 @@ public class MemoryClusterManagementServiceImpl implements
ClusterManagementServ
public String getCurrentWorker() {
return this.config.getWorkerId();
}
+
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.STANDALONE;
+ }
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 5e8bb5b..5f763ab 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -27,6 +27,7 @@ import
org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.ListConverter;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
@@ -61,18 +62,20 @@ public class MemoryConfigManagementServiceImpl implements
ConfigManagementServic
*/
private ConnectorConfigUpdateListener connectorConfigUpdateListener;
- private final Plugin plugin;
+ private Plugin plugin;
- public MemoryConfigManagementServiceImpl(ConnectConfig connectConfig,
Plugin plugin) {
+ public MemoryConfigManagementServiceImpl() {
+ }
+ @Override public void initialize(ConnectConfig connectConfig, Plugin
plugin) {
this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
- new JsonConverter(),
- new JsonConverter(ConnectKeyValue.class));
+
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new JsonConverter(ConnectKeyValue.class));
this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
- new JsonConverter(),
- new ListConverter(ConnectKeyValue.class));
+
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new ListConverter(ConnectKeyValue.class));
this.plugin = plugin;
}
@@ -251,4 +254,8 @@ public class MemoryConfigManagementServiceImpl implements
ConfigManagementServic
public Plugin getPlugin() {
return this.plugin;
}
+
+ @Override public StagingMode getStagingMode() {
+ return StagingMode.STANDALONE;
+ }
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
index ce9b210..2629ce2 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/KeyValueStore.java
@@ -92,4 +92,15 @@ public interface KeyValueStore<K, V> {
* Persist all data into the store.
*/
void persist();
+
+
+ Stage getStage();
+
+
+ enum Stage {
+ CONNECTOR,
+ TASK,
+ POSITION,
+ UNIVERSAL
+ }
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
index ea8a78b..c474985 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java
@@ -72,4 +72,8 @@ public class MemoryBasedKeyValueStore<K, V> implements
KeyValueStore<K, V> {
public void persist() {
}
+
+ @Override public Stage getStage() {
+ return Stage.UNIVERSAL;
+ }
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
new file mode 100644
index 0000000..db9e452
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceProviderUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.jetbrains.annotations.NotNull;
+
+public class ServiceProviderUtil {
+
+ @NotNull public static ClusterManagementService
getClusterManagementServices(StagingMode stagingMode) {
+ ClusterManagementService clusterManagementService = null;
+ ClusterManagementService universalClusterManagementService = null;
+ ServiceLoader<ClusterManagementService>
clusterManagementServiceServiceLoader =
ServiceLoader.load(ClusterManagementService.class);
+ Iterator<ClusterManagementService> clusterManagementServiceIterator =
clusterManagementServiceServiceLoader.iterator();
+ while (clusterManagementServiceIterator.hasNext()) {
+ ClusterManagementService clusterManagementService1 =
clusterManagementServiceIterator.next();
+ if (clusterManagementService1.getStagingMode() == stagingMode) {
+ clusterManagementService = clusterManagementService1;
+ break;
+ }
+ if (clusterManagementService1.getStagingMode() ==
StagingMode.UNIVERSAL) {
+ universalClusterManagementService = clusterManagementService1;
+ break;
+ }
+ }
+ if (null == clusterManagementService) {
+ clusterManagementService = universalClusterManagementService;
+ }
+ return clusterManagementService;
+ }
+
+ @NotNull public static ConfigManagementService
getConfigManagementServices(StagingMode stagingMode) {
+ ConfigManagementService configManagementService = null;
+ ConfigManagementService universalConfigManagementService = null;
+ ServiceLoader<ConfigManagementService>
configManagementServiceServiceLoader =
ServiceLoader.load(ConfigManagementService.class);
+ Iterator<ConfigManagementService> configManagementServiceIterator =
configManagementServiceServiceLoader.iterator();
+ while (configManagementServiceIterator.hasNext()) {
+ ConfigManagementService configManagementService1 =
configManagementServiceIterator.next();
+ if (configManagementService1.getStagingMode() == stagingMode) {
+ configManagementService = configManagementService1;
+ break;
+ }
+ if (configManagementService1.getStagingMode() ==
StagingMode.UNIVERSAL) {
+ universalConfigManagementService = configManagementService1;
+ break;
+ }
+ }
+ if (null == configManagementService) {
+ configManagementService = universalConfigManagementService;
+ }
+ return configManagementService;
+ }
+
+ @NotNull public static PositionManagementService
getPositionManagementServices(StagingMode stagingMode) {
+ PositionManagementService positionManagementService = null;
+ PositionManagementService universalPositionManagementService = null;
+ ServiceLoader<PositionManagementService>
positionManagementServiceServiceLoader =
ServiceLoader.load(PositionManagementService.class);
+ Iterator<PositionManagementService> positionManagementServiceIterator
= positionManagementServiceServiceLoader.iterator();
+ while (positionManagementServiceIterator.hasNext()) {
+ PositionManagementService positionManagementService1 =
positionManagementServiceIterator.next();
+ if (positionManagementService1.getStagingMode() == stagingMode) {
+ positionManagementService = positionManagementService1;
+ break;
+ }
+ if (positionManagementService1.getStagingMode() ==
StagingMode.UNIVERSAL) {
+ universalPositionManagementService =
positionManagementService1;
+ break;
+ }
+ }
+ if (null == positionManagementService) {
+ positionManagementService = universalPositionManagementService;
+ }
+ return positionManagementService;
+ }
+}
+
diff --git
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
new file mode 100644
index 0000000..c977a12
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ClusterManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl
\ No newline at end of file
diff --git
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
new file mode 100644
index 0000000..7ad0adb
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.ConfigManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl
\ No newline at end of file
diff --git
a/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
new file mode 100644
index 0000000..77cbdb7
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/resources/META-INF/services/org.apache.rocketmq.connect.runtime.service.PositionManagementService
@@ -0,0 +1,2 @@
+org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
+org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl
\ No newline at end of file
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
index 10884df..ecc9e25 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
@@ -18,12 +18,12 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
-
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StagingMode;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
public class TestPositionManageServiceImpl implements
PositionManagementService {
@@ -75,4 +75,12 @@ public class TestPositionManageServiceImpl implements
PositionManagementService
public void registerListener(PositionUpdateListener listener) {
}
+
+ @Override public void initialize(ConnectConfig connectConfig) {
+
+ }
+
+ @Override public StagingMode getStagingMode() {
+ return null;
+ }
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index 4af8f61..03b5e01 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -122,8 +122,8 @@ public class ConfigManagementServiceImplTest {
}
}).when(producer).send(any(Message.class), any(SendCallback.class));
- configManagementService = new
ConfigManagementServiceImpl(connectConfig, plugin);
-
+ configManagementService = new ConfigManagementServiceImpl();
+ configManagementService.initialize(connectConfig, plugin);
final Field connectorKeyValueStoreField =
ConfigManagementServiceImpl.class.getDeclaredField("connectorKeyValueStore");
connectorKeyValueStoreField.setAccessible(true);
connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>)
connectorKeyValueStoreField.get(configManagementService);
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index 6aa55b8..990ec8c 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -116,7 +116,8 @@ public class PositionManagementServiceImplTest {
}
}).when(producer).send(any(Message.class), any(SendCallback.class));
- positionManagementService = new
PositionManagementServiceImpl(connectConfig);
+ positionManagementService = new PositionManagementServiceImpl();
+ positionManagementService.initialize(connectConfig);
final Field dataSynchronizerField =
PositionManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
dataSynchronizerField.setAccessible(true);