This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new fc1b8e1 [ISSUE #120] Standalone dev (#142)
fc1b8e1 is described below
commit fc1b8e1abd9d2d8a215fec664307bada3c50589a
Author: xiaoyi <[email protected]>
AuthorDate: Tue May 24 14:26:38 2022 +0800
[ISSUE #120] Standalone dev (#142)
* init standalone
* fixed
* complete standalone mode
* test standalone mode
* check code
* fixed
* Ensure the uniqueness of offset keys for different connectors #143
Co-authored-by: “sunxiaojian” <“[email protected]”>
---
connectors/rocketmq-connect-hudi/README.md | 2 +-
.../connect/jdbc/connector/JdbcSourceConfig.java | 7 +-
.../jdbc/source/offset/SourceOffsetCompute.java | 4 +-
.../{run_worker.sh => connect-distributed.sh} | 2 +-
.../{run_worker.sh => connect-standalone.sh} | 2 +-
.../connect/runtime/ConnectController.java | 256 ---------------------
...Startup.java => DistributedConnectStartup.java} | 50 +++-
...tStartup.java => StandaloneConnectStartup.java} | 64 ++++--
.../connect/runtime/connectorwrapper/Worker.java | 9 +-
.../runtime/connectorwrapper/WorkerDirectTask.java | 8 +-
.../runtime/connectorwrapper/WorkerSourceTask.java | 14 +-
.../controller/AbstractConnectController.java | 176 ++++++++++++++
.../runtime/controller/ConnectController.java | 35 +++
.../controller/distributed/DistributedConfig.java | 26 +++
.../distributed/DistributedConnectController.java | 118 ++++++++++
.../controller/standalone/StandaloneConfig.java | 26 +++
.../standalone/StandaloneConnectController.java | 65 ++++++
.../converter/RecordPartitionConverter.java | 10 +-
.../converter/RecordPositionMapConverter.java | 14 +-
.../rocketmq/connect/runtime/rest/RestHandler.java | 9 +-
.../runtime/service/ClusterManagementService.java | 11 +
.../runtime/service/ConfigManagementService.java | 13 ++
.../runtime/service/DefaultConnectorContext.java | 7 +-
.../service/OffsetManagementServiceImpl.java | 57 ++---
.../runtime/service/PositionManagementService.java | 25 +-
.../service/PositionManagementServiceImpl.java | 59 ++---
.../connect/runtime/service/RebalanceImpl.java | 18 +-
.../memory/FileOffsetManagementServiceImpl.java | 204 ++++++++++++++++
.../memory/FilePositionManagementServiceImpl.java | 201 ++++++++++++++++
.../memory/MemoryClusterManagementServiceImpl.java | 97 ++++++++
.../memory/MemoryConfigManagementServiceImpl.java | 254 ++++++++++++++++++++
.../service/memory/StandaloneRebalanceService.java | 90 ++++++++
.../DefaultAllocateConnAndTaskStrategy.java | 2 +-
.../runtime/store/ExtendRecordPartition.java | 63 +++++
.../runtime/store/PositionStorageReaderImpl.java | 15 +-
...eReaderImpl.java => PositionStorageWriter.java} | 31 +--
.../{connect.conf => connect-distributed.conf} | 0
.../{connect.conf => connect-standalone.conf} | 4 +-
.../runtime/connectorwrapper/WorkerTest.java | 4 +-
.../testimpl/TestPositionManageServiceImpl.java | 11 +-
.../connect/runtime/rest/RestHandlerTest.java | 4 +-
.../service/PositionManagementServiceImplTest.java | 33 +--
42 files changed, 1654 insertions(+), 446 deletions(-)
diff --git a/connectors/rocketmq-connect-hudi/README.md
b/connectors/rocketmq-connect-hudi/README.md
index cadc364..05fdfb9 100644
--- a/connectors/rocketmq-connect-hudi/README.md
+++ b/connectors/rocketmq-connect-hudi/README.md
@@ -72,6 +72,6 @@
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
* **spark-submit 启动任务**
将connect-runtime打包后通过spark-submit提交任务
```
-nohup sh spark-submit --class
org.apache.rocketmq.connect.runtime.ConnectStartup --conf
"spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files
/xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apa
[...]
+nohup sh spark-submit --class
org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf
"spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files
/xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages
org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9
[...]
```
后续操作参考rocketmq-connect-hudi启动步骤
\ No newline at end of file
diff --git
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
index 3c34c5e..20f3aa7 100644
---
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
+++
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConfig.java
@@ -207,7 +207,7 @@ public class JdbcSourceConfig extends AbstractConfig {
private String incrementingColumnName;
private List<String> timestampColumnNames;
private long timestampDelayIntervalMs;
- private Long timestampInitial;
+ private Long timestampInitial = TIMESTAMP_INITIAL_DEFAULT;
private Set<String> tableWhitelist;
private Set<String> tableBlacklist;
private String schemaPattern;
@@ -231,7 +231,10 @@ public class JdbcSourceConfig extends AbstractConfig {
this.incrementingColumnName =
config.getString(INCREMENTING_COLUMN_NAME_CONFIG);
this.timestampColumnNames = getList(config,
TIMESTAMP_COLUMN_NAME_CONFIG);
timestampDelayIntervalMs =
config.getLong(TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
- this.timestampInitial = config.containsKey(TIMESTAMP_INITIAL_CONFIG) ?
config.getLong(TIMESTAMP_INITIAL_CONFIG) : TIMESTAMP_INITIAL_DEFAULT;
+//
this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG,TIMESTAMP_INITIAL_DEFAULT);
+ if (config.containsKey(TIMESTAMP_INITIAL_CONFIG)){
+ this.timestampInitial=config.getLong(TIMESTAMP_INITIAL_CONFIG);
+ }
this.tableWhitelist = new HashSet<>(getList(config,
TABLE_WHITELIST_CONFIG));
this.tableBlacklist = new HashSet<>(getList(config,
TABLE_BLACKLIST_CONFIG));
this.schemaPattern = config.getString(SCHEMA_PATTERN_CONFIG);
diff --git
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
index 6b5411e..0ae2f2c 100644
---
a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
+++
b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
@@ -166,7 +166,7 @@ public class SourceOffsetCompute {
throw new ConnectException("Unexpected query mode: " +
queryMode);
}
Map<String, Object> offset = null;
- if (offsets != null && tablePartitionsToCheck != null) {
+ if (offsets != null && tablePartitionsToCheck != null &&
offsets.containsKey(tablePartitionsToCheck)) {
offset = (Map<String, Object>)
offsets.get(tablePartitionsToCheck).getOffset();
}
offset = computeInitialOffset(
@@ -271,7 +271,7 @@ public class SourceOffsetCompute {
log.info("No offsets found for '{}', so using configured
timestamp {}", tableOrQuery,
timestampInitial);
} else {
- if (queryMode != Querier.QueryMode.TABLE) {
+ if (queryMode != Querier.QueryMode.TABLE || timestampColumns
== null || timestampColumns.isEmpty()) {
return initialPartitionOffset;
}
try {
diff --git a/rocketmq-connect-runtime/run_worker.sh
b/rocketmq-connect-runtime/connect-distributed.sh
similarity index 91%
copy from rocketmq-connect-runtime/run_worker.sh
copy to rocketmq-connect-runtime/connect-distributed.sh
index d87a4bf..2901a07 100755
--- a/rocketmq-connect-runtime/run_worker.sh
+++ b/rocketmq-connect-runtime/connect-distributed.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
echo "run rumtime worker"
-cd target/distribution/ && java -cp .:./conf/:./lib/*
org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
\ No newline at end of file
+cd target/distribution/ && java -cp .:./conf/:./lib/*
org.apache.rocketmq.connect.runtime.DistributedConnectStartup -c
conf/connect-distributed.conf
\ No newline at end of file
diff --git a/rocketmq-connect-runtime/run_worker.sh
b/rocketmq-connect-runtime/connect-standalone.sh
similarity index 91%
rename from rocketmq-connect-runtime/run_worker.sh
rename to rocketmq-connect-runtime/connect-standalone.sh
index d87a4bf..57daac4 100755
--- a/rocketmq-connect-runtime/run_worker.sh
+++ b/rocketmq-connect-runtime/connect-standalone.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
echo "run rumtime worker"
-cd target/distribution/ && java -cp .:./conf/:./lib/*
org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
\ No newline at end of file
+cd target/distribution/ && java -cp .:./conf/:./lib/*
org.apache.rocketmq.connect.runtime.StandaloneConnectStartup -c
conf/connect-standalone.conf
\ No newline at end of file
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
deleted file mode 100644
index 1c59287..0000000
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.runtime;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
-import org.apache.rocketmq.connect.runtime.rest.RestHandler;
-import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
-import
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
-import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-import
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
-import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
-import org.apache.rocketmq.connect.runtime.service.RebalanceService;
-import
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
-import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
-import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Connect controller to access and control all resource in runtime.
- */
-public class ConnectController {
-
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
- /**
- * Configuration of current runtime.
- */
- private final ConnectConfig connectConfig;
-
- /**
- * All the configurations of current running connectors and tasks in
cluster.
- */
- private final ConfigManagementService configManagementService;
-
- /**
- * Position management of source tasks.
- */
- private final PositionManagementService positionManagementService;
-
- /**
- * Offset management of sink tasks.
- */
- private final PositionManagementService offsetManagementService;
-
- /**
- * Manage the online info of the cluster.
- */
- private final ClusterManagementService clusterManagementService;
-
- /**
- * A worker to schedule all connectors and tasks assigned to current
process.
- */
- private final Worker worker;
-
- /**
- * A REST handler, interacting with user.
- */
- private final RestHandler restHandler;
-
- /**
- * Assign all connectors and tasks to all alive process in the cluster.
- */
- private final RebalanceImpl rebalanceImpl;
-
- /**
- * A scheduled task to rebalance all connectors and tasks in the cluster.
- */
- private final RebalanceService rebalanceService;
-
- /**
- * Thread pool to run schedule task.
- */
- private ScheduledExecutorService scheduledExecutorService;
-
- private final Plugin plugin;
-
- private ConnectStatsManager connectStatsManager;
-
- private final ConnectStatsService connectStatsService;
-
- public ConnectController(
- ConnectConfig connectConfig) {
-
- List<String> pluginPaths = new ArrayList<>(16);
- if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
- String[] strArr = connectConfig.getPluginPaths().split(",");
- for (String path : strArr) {
- if (StringUtils.isNotEmpty(path)) {
- pluginPaths.add(path);
- }
- }
- }
- plugin = new Plugin(pluginPaths);
- plugin.initPlugin();
- this.connectStatsManager = new ConnectStatsManager(connectConfig);
- this.connectStatsService = new ConnectStatsService();
- this.connectConfig = connectConfig;
- this.clusterManagementService = new
ClusterManagementServiceImpl(connectConfig);
- this.configManagementService = new
ConfigManagementServiceImpl(connectConfig, plugin);
- this.positionManagementService = new
PositionManagementServiceImpl(connectConfig);
- this.offsetManagementService = new
OffsetManagementServiceImpl(connectConfig);
- this.worker = new Worker(connectConfig, positionManagementService,
configManagementService, plugin, this);
- AllocateConnAndTaskStrategy strategy =
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
- this.rebalanceImpl = new RebalanceImpl(worker,
configManagementService, clusterManagementService, strategy, this);
- this.restHandler = new RestHandler(this);
- this.rebalanceService = new RebalanceService(rebalanceImpl,
configManagementService, clusterManagementService);
- }
-
- public void initialize() {
- this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r,
"ConnectScheduledThread"));
- }
-
- public void start() {
-
- clusterManagementService.start();
- configManagementService.start();
- positionManagementService.start();
- offsetManagementService.start();
- worker.start();
- rebalanceService.start();
- connectStatsService.start();
-
- // Persist configurations of current connectors and tasks.
- this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
- try {
- ConnectController.this.configManagementService.persist();
- } catch (Exception e) {
- log.error("schedule persist config error.", e);
- }
- }, 1000, this.connectConfig.getConfigPersistInterval(),
TimeUnit.MILLISECONDS);
-
- // Persist position information of source tasks.
- this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
- try {
- ConnectController.this.positionManagementService.persist();
- } catch (Exception e) {
- log.error("schedule persist position error.", e);
- }
- }, 1000, this.connectConfig.getPositionPersistInterval(),
TimeUnit.MILLISECONDS);
-
- // Persist offset information of sink tasks.
- this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-
- try {
- ConnectController.this.offsetManagementService.persist();
- } catch (Exception e) {
- log.error("schedule persist offset error.", e);
- }
- }, 1000, this.connectConfig.getOffsetPersistInterval(),
TimeUnit.MILLISECONDS);
- }
-
- public void shutdown() {
-
- if (worker != null) {
- worker.stop();
- }
-
- if (configManagementService != null) {
- configManagementService.stop();
- }
-
- if (positionManagementService != null) {
- positionManagementService.stop();
- }
-
- if (offsetManagementService != null) {
- offsetManagementService.stop();
- }
-
- if (clusterManagementService != null) {
- clusterManagementService.stop();
- }
-
- this.scheduledExecutorService.shutdown();
- try {
- this.scheduledExecutorService.awaitTermination(5000,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("shutdown scheduledExecutorService error.", e);
- }
-
- if (rebalanceService != null) {
- rebalanceService.stop();
- }
- }
-
- public ConnectConfig getConnectConfig() {
- return connectConfig;
- }
-
- public ConfigManagementService getConfigManagementService() {
- return configManagementService;
- }
-
- public PositionManagementService getPositionManagementService() {
- return positionManagementService;
- }
-
- public ClusterManagementService getClusterManagementService() {
- return clusterManagementService;
- }
-
- public Worker getWorker() {
- return worker;
- }
-
- public RestHandler getRestHandler() {
- return restHandler;
- }
-
- public RebalanceImpl getRebalanceImpl() {
- return rebalanceImpl;
- }
-
- public ConnectStatsManager getConnectStatsManager() {
- return connectStatsManager;
- }
-
- public void setConnectStatsManager(ConnectStatsManager
connectStatsManager) {
- this.connectStatsManager = connectStatsManager;
- }
-
- public ConnectStatsService getConnectStatsService() {
- return connectStatsService;
- }
-}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
similarity index 69%
copy from
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
copy to
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index c31b7b8..dc38a86 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -20,15 +20,25 @@ package org.apache.rocketmq.connect.runtime;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConfig;
+import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
+import
org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.OffsetManagementServiceImpl;
+import
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +46,7 @@ import org.slf4j.LoggerFactory;
/**
* Startup class of the runtime worker.
*/
-public class ConnectStartup {
+public class DistributedConnectStartup {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@@ -50,7 +60,7 @@ public class ConnectStartup {
start(createConnectController(args));
}
- private static void start(ConnectController controller) {
+ private static void start(DistributedConnectController controller) {
try {
controller.start();
@@ -69,37 +79,53 @@ public class ConnectStartup {
* @param args
* @return
*/
- private static ConnectController createConnectController(String[] args) {
+ private static DistributedConnectController
createConnectController(String[] args) {
try {
// Build the command line options.
Options options = ServerUtil.buildCommandlineOptions(new
Options());
commandLine = ServerUtil.parseCmdLine("connect", args,
buildCommandlineOptions(options),
- new PosixParser());
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// Load configs from command line.
- ConnectConfig connectConfig = new ConnectConfig();
+ DistributedConfig connectConfig = new DistributedConfig();
if (commandLine.hasOption('c')) {
- String file = commandLine.getOptionValue('c');
+ String file = commandLine.getOptionValue('c').trim();
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new
FileInputStream(file));
properties = new Properties();
properties.load(in);
-
FileAndPropertyUtil.properties2Object(properties,
connectConfig);
-
in.close();
}
}
+ List<String> pluginPaths = new ArrayList<>(16);
+ if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
+ String[] strArr = connectConfig.getPluginPaths().split(",");
+ for (String path : strArr) {
+ if (StringUtils.isNotEmpty(path)) {
+ pluginPaths.add(path);
+ }
+ }
+ }
+ Plugin plugin = new Plugin(pluginPaths);
+ plugin.initPlugin();
+
// Create controller and initialize.
- ConnectController controller = new
ConnectController(connectConfig);
- controller.initialize();
+
+ DistributedConnectController controller = new
DistributedConnectController(
+ plugin,
+ connectConfig,
+ new ClusterManagementServiceImpl(connectConfig),
+ new ConfigManagementServiceImpl(connectConfig, plugin),
+ new PositionManagementServiceImpl(connectConfig),
+ new OffsetManagementServiceImpl(connectConfig));
// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@@ -133,7 +159,7 @@ public class ConnectStartup {
Option opt = new Option("c", "configFile", true, "connect config
properties file");
opt.setRequired(false);
options.addOption(opt);
-
return options;
+
}
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
similarity index 68%
rename from
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
rename to
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index c31b7b8..747e806 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -17,26 +17,36 @@
package org.apache.rocketmq.connect.runtime;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
+import
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
+import
org.apache.rocketmq.connect.runtime.service.memory.MemoryConfigManagementServiceImpl;
+import
org.apache.rocketmq.connect.runtime.service.memory.FileOffsetManagementServiceImpl;
+import
org.apache.rocketmq.connect.runtime.service.memory.FilePositionManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Startup class of the runtime worker.
*/
-public class ConnectStartup {
+public class StandaloneConnectStartup {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@@ -50,11 +60,11 @@ public class ConnectStartup {
start(createConnectController(args));
}
- private static void start(ConnectController controller) {
+ private static void start(StandaloneConnectController controller) {
try {
controller.start();
- String tip = "The worker [" +
controller.getClusterManagementService().getCurrentWorker() + "] boot success.";
+ String tip = "The standalone worker boot success.";
log.info(tip);
System.out.printf("%s%n", tip);
} catch (Throwable e) {
@@ -69,37 +79,51 @@ public class ConnectStartup {
* @param args
* @return
*/
- private static ConnectController createConnectController(String[] args) {
-
+ private static StandaloneConnectController
createConnectController(String[] args) {
try {
-
// Build the command line options.
Options options = ServerUtil.buildCommandlineOptions(new
Options());
commandLine = ServerUtil.parseCmdLine("connect", args,
buildCommandlineOptions(options),
- new PosixParser());
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// Load configs from command line.
- ConnectConfig connectConfig = new ConnectConfig();
+ StandaloneConfig connectConfig = new StandaloneConfig();
if (commandLine.hasOption('c')) {
- String file = commandLine.getOptionValue('c');
+ String file = commandLine.getOptionValue('c').trim();
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new
FileInputStream(file));
properties = new Properties();
properties.load(in);
-
FileAndPropertyUtil.properties2Object(properties,
connectConfig);
-
in.close();
}
}
+ List<String> pluginPaths = new ArrayList<>(16);
+ if (StringUtils.isNotEmpty(connectConfig.getPluginPaths())) {
+ String[] strArr = connectConfig.getPluginPaths().split(",");
+ for (String path : strArr) {
+ if (StringUtils.isNotEmpty(path)) {
+ pluginPaths.add(path);
+ }
+ }
+ }
+ Plugin plugin = new Plugin(pluginPaths);
+ plugin.initPlugin();
+
// Create controller and initialize.
- ConnectController controller = new
ConnectController(connectConfig);
- controller.initialize();
+
+ StandaloneConnectController controller = new
StandaloneConnectController(
+ plugin,
+ connectConfig,
+ new MemoryClusterManagementServiceImpl(connectConfig),
+ new MemoryConfigManagementServiceImpl(connectConfig,
plugin),
+ new FilePositionManagementServiceImpl(connectConfig),
+ new FileOffsetManagementServiceImpl(connectConfig));
// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@@ -133,7 +157,7 @@ public class ConnectStartup {
Option opt = new Option("c", "configFile", true, "connect config
properties file");
opt.setRequired(false);
options.addOption(opt);
-
return options;
+
}
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 9d48143..48bb04d 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -131,8 +131,9 @@ public class Worker {
private final ConnectStatsService connectStatsService;
public Worker(ConnectConfig connectConfig,
- PositionManagementService positionManagementService,
ConfigManagementService configManagementService,
- Plugin plugin, ConnectController connectController) {
+ PositionManagementService positionManagementService,
+ ConfigManagementService configManagementService,
+ Plugin plugin, AbstractConnectController connectController) {
this.connectConfig = connectConfig;
this.taskExecutor = Executors.newCachedThreadPool(new
DefaultThreadFactory("task-Worker-Executor-"));
this.positionManagementService = positionManagementService;
@@ -159,7 +160,7 @@ public class Worker {
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue>
connectorConfigs,
- ConnectController connectController) throws Exception {
+ AbstractConnectController connectController) throws Exception {
Set<WorkerConnector> stoppedConnector = new HashSet<>();
for (WorkerConnector workerConnector : workingConnectors) {
try {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index d6f3ed7..2a0e6c7 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +79,8 @@ public class WorkerDirectTask implements WorkerTask {
private final OffsetStorageReader positionStorageReader;
+ private final PositionStorageWriter positionStorageWriter;
+
private final AtomicReference<WorkerState> workerState;
public WorkerDirectTask(String connectorName,
@@ -91,7 +94,8 @@ public class WorkerDirectTask implements WorkerTask {
this.sinkTask = sinkTask;
this.taskConfig = taskConfig;
this.positionManagementService = positionManagementService;
- this.positionStorageReader = new
PositionStorageReaderImpl(positionManagementService);
+ this.positionStorageReader = new
PositionStorageReaderImpl(connectorName, positionManagementService);
+ this.positionStorageWriter = new PositionStorageWriter(connectorName,
positionManagementService);
this.state = new AtomicReference<>(WorkerTaskState.NEW);
this.workerState = workerState;
}
@@ -141,7 +145,7 @@ public class WorkerDirectTask implements WorkerTask {
sinkTask.put(sinkDataEntries);
try {
if (!MapUtils.isEmpty(map)) {
- map.forEach(positionManagementService::putPosition);
+ map.forEach(positionStorageWriter::putPosition);
}
} catch (Exception e) {
log.error("Source task save position info failed.", e);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 4c7eee8..8e62ca3 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -54,6 +54,7 @@ import
org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
+import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,13 +88,18 @@ public class WorkerSourceTask implements WorkerTask {
*/
private AtomicReference<WorkerTaskState> state;
- private final PositionManagementService positionManagementService;
+
/**
* Used to read the position of source data source.
*/
private OffsetStorageReader offsetStorageReader;
+ /**
+ * Used to write the position of source data source.
+ */
+ private PositionStorageWriter positionStorageWriter;
+
/**
* A RocketMQ producer to send message to dest MQ.
*/
@@ -137,8 +143,8 @@ public class WorkerSourceTask implements WorkerTask {
this.connectorName = connectorName;
this.sourceTask = sourceTask;
this.taskConfig = taskConfig;
- this.positionManagementService = positionManagementService;
- this.offsetStorageReader = new
PositionStorageReaderImpl(positionManagementService);
+ this.offsetStorageReader = new
PositionStorageReaderImpl(connectorName, positionManagementService);
+ this.positionStorageWriter = new PositionStorageWriter(connectorName,
positionManagementService);
this.producer = producer;
this.recordConverter = recordConverter;
this.state = new AtomicReference<>(WorkerTaskState.NEW);
@@ -322,7 +328,7 @@ public class WorkerSourceTask implements WorkerTask {
if (null != partition && null != position) {
Map<String, String> offsetMap = (Map<String,
String>) offset.getOffset();
offsetMap.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
String.valueOf(sourceDataEntry.getTimestamp()));
-
positionManagementService.putPosition(partition, offset);
+ positionStorageWriter.putPosition(partition,
offset);
}
} catch (Exception e) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
new file mode 100644
index 0000000..73c259d
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.rest.RestHandler;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * connect controller
+ */
+public abstract class AbstractConnectController implements ConnectController {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ /**
+ * Configuration of current runtime.
+ */
+ protected final ConnectConfig connectConfig;
+
+ /**
+ * All the configurations of current running connectors and tasks in
cluster.
+ */
+ protected final ConfigManagementService configManagementService;
+
+ /**
+ * Position management of source tasks.
+ */
+ protected final PositionManagementService positionManagementService;
+
+ /**
+ * Offset management of sink tasks.
+ */
+ protected final PositionManagementService offsetManagementService;
+
+ /**
+ * Manage the online info of the cluster.
+ */
+ protected final ClusterManagementService clusterManagementService;
+
+ /**
+ * A worker to schedule all connectors and tasks assigned to current
process.
+ */
+ protected final Worker worker;
+
+ /**
+ * A REST handler, interacting with user.
+ */
+ protected final RestHandler restHandler;
+
+
+
+ protected final Plugin plugin;
+
+ protected final ConnectStatsManager connectStatsManager;
+
+ protected final ConnectStatsService connectStatsService;
+
+ /**
+ * init connect controller
+ * @param connectConfig
+ */
+ public AbstractConnectController(
+ Plugin plugin,
+ ConnectConfig connectConfig,
+ ClusterManagementService clusterManagementService,
+ ConfigManagementService configManagementService,
+ PositionManagementService positionManagementService,
+ PositionManagementService offsetManagementService
+ ) {
+ // set config
+ this.connectConfig = connectConfig;
+ // set plugin
+ this.plugin = plugin;
+ // set metrics
+ this.connectStatsManager = new ConnectStatsManager(connectConfig);
+ this.connectStatsService = new ConnectStatsService();
+
+ this.clusterManagementService = clusterManagementService;
+ this.configManagementService = configManagementService;
+ this.positionManagementService = positionManagementService;
+ this.offsetManagementService = offsetManagementService;
+ this.worker = new Worker(connectConfig, positionManagementService,
configManagementService, plugin, this);
+ this.restHandler = new RestHandler(this);
+ }
+
+
+ @Override
+ public void start() {
+ clusterManagementService.start();
+ configManagementService.start();
+ positionManagementService.start();
+ offsetManagementService.start();
+ worker.start();
+ connectStatsService.start();
+ }
+
+ @Override
+ public void shutdown() {
+
+ if (worker != null) {
+ worker.stop();
+ }
+
+ if (configManagementService != null) {
+ configManagementService.stop();
+ }
+
+ if (positionManagementService != null) {
+ positionManagementService.stop();
+ }
+
+ if (offsetManagementService != null) {
+ offsetManagementService.stop();
+ }
+
+ if (clusterManagementService != null) {
+ clusterManagementService.stop();
+ }
+
+ }
+
+ public ConnectConfig getConnectConfig() {
+ return connectConfig;
+ }
+
+ public ConfigManagementService getConfigManagementService() {
+ return configManagementService;
+ }
+
+ public PositionManagementService getPositionManagementService() {
+ return positionManagementService;
+ }
+
+ public ClusterManagementService getClusterManagementService() {
+ return clusterManagementService;
+ }
+
+ public Worker getWorker() {
+ return worker;
+ }
+
+ public ConnectStatsManager getConnectStatsManager() {
+ return connectStatsManager;
+ }
+
+ public ConnectStatsService getConnectStatsService() {
+ return connectStatsService;
+ }
+
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
new file mode 100644
index 0000000..1b2049d
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/ConnectController.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller;
+
+/**
+ * connect controller
+ */
+public interface ConnectController {
+
+ /**
+ * start controller
+ */
+ void start();
+
+ /**
+ * shutdown controller
+ */
+ void shutdown();
+
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
new file mode 100644
index 0000000..4784b21
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+/**
+ * distributed worker config
+ */
+public class DistributedConfig extends ConnectConfig {
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
new file mode 100644
index 0000000..0dd048a
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import org.apache.rocketmq.connect.runtime.service.RebalanceService;
+import
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Connect controller to access and control all resource in runtime.
+ */
+public class DistributedConnectController extends AbstractConnectController {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ private final RebalanceImpl rebalanceImpl;
+
+ /**
+ * Thread pool to run schedule task.
+ */
+ protected ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * A scheduled task to rebalance all connectors and tasks in the cluster.
+ */
+ private final RebalanceService rebalanceService;
+
+ public DistributedConnectController(Plugin plugin,
+ DistributedConfig connectConfig,
+ ClusterManagementService
clusterManagementService,
+ ConfigManagementService
configManagementService,
+ PositionManagementService
positionManagementService,
+ PositionManagementService
offsetManagementService) {
+
+ super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService, offsetManagementService);
+ AllocateConnAndTaskStrategy strategy =
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
+ this.rebalanceImpl = new RebalanceImpl(worker,
configManagementService, clusterManagementService, strategy, this);
+ this.rebalanceService = new RebalanceService(rebalanceImpl,
configManagementService, clusterManagementService);
+ }
+
+ @Override
+ public void start() {
+ this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r,
"ConnectScheduledThread"));
+ super.start();
+ rebalanceService.start();
+ // Persist configurations of current connectors and tasks.
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ this.configManagementService.persist();
+ } catch (Exception e) {
+ log.error("schedule persist config error.", e);
+ }
+ }, 1000, this.connectConfig.getConfigPersistInterval(),
TimeUnit.MILLISECONDS);
+
+ // Persist position information of source tasks.
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+
+ try {
+ this.positionManagementService.persist();
+ } catch (Exception e) {
+ log.error("schedule persist position error.", e);
+ }
+
+ }, 1000, this.connectConfig.getPositionPersistInterval(),
TimeUnit.MILLISECONDS);
+
+ // Persist offset information of sink tasks.
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ this.offsetManagementService.persist();
+ } catch (Exception e) {
+ log.error("schedule persist offset error.", e);
+ }
+ }, 1000, this.connectConfig.getOffsetPersistInterval(),
TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (rebalanceService != null) {
+ rebalanceService.stop();
+ }
+ scheduledExecutorService.shutdown();
+ try {
+ scheduledExecutorService.awaitTermination(5000,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("shutdown scheduledExecutorService error.", e);
+ }
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
new file mode 100644
index 0000000..967c72e
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
+/**
+ * worker standalone config
+ *
+ */
+public class StandaloneConfig extends ConnectConfig {
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
new file mode 100644
index 0000000..45b6061
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import
org.apache.rocketmq.connect.runtime.service.memory.StandaloneRebalanceService;
+import
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+
+
+/**
+ * Connect controller to access and control all resource in runtime.
+ */
+public class StandaloneConnectController extends AbstractConnectController {
+
+ private final RebalanceImpl rebalanceImpl;
+ private final StandaloneRebalanceService rebalanceService;
+ public StandaloneConnectController(Plugin plugin,
+ StandaloneConfig connectConfig,
+ ClusterManagementService
clusterManagementService,
+ ConfigManagementService
configManagementService,
+ PositionManagementService
positionManagementService,
+ PositionManagementService
offsetManagementService) {
+
+ super(plugin, connectConfig, clusterManagementService,
configManagementService, positionManagementService, offsetManagementService);
+ AllocateConnAndTaskStrategy strategy =
ConnectUtil.initAllocateConnAndTaskStrategy(connectConfig);
+ this.rebalanceImpl = new RebalanceImpl(worker,
configManagementService, clusterManagementService, strategy, this);
+ this.rebalanceService = new StandaloneRebalanceService(rebalanceImpl,
configManagementService, clusterManagementService);
+
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ rebalanceService.start();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (rebalanceService != null) {
+ rebalanceService.stop();
+ }
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
index 625d14a..0ed4960 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverter.java
@@ -19,21 +19,21 @@ package org.apache.rocketmq.connect.runtime.converter;
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.Converter;
-import io.openmessaging.connector.api.data.RecordPartition;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ByteBuffer converter.
*/
-public class RecordPartitionConverter implements Converter<RecordPartition> {
+public class RecordPartitionConverter implements
Converter<ExtendRecordPartition> {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@Override
- public byte[] objectToByte(RecordPartition recordPartition) {
+ public byte[] objectToByte(ExtendRecordPartition recordPartition) {
try {
String json = JSON.toJSONString(recordPartition);
return json.getBytes("UTF-8");
@@ -44,10 +44,10 @@ public class RecordPartitionConverter implements
Converter<RecordPartition> {
}
@Override
- public RecordPartition byteToObject(byte[] bytes) {
+ public ExtendRecordPartition byteToObject(byte[] bytes) {
try {
String text = new String(bytes, "UTF-8");
- RecordPartition res = JSON.parseObject(text,
RecordPartition.class);
+ ExtendRecordPartition res = JSON.parseObject(text,
ExtendRecordPartition.class);
return res;
} catch (UnsupportedEncodingException e) {
log.error("JsonConverter#byteToObject failed", e);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
index f0a6fc8..f0a3bde 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverter.java
@@ -20,28 +20,28 @@ package org.apache.rocketmq.connect.runtime.converter;
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.Converter;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Byte Map to byte[].
*/
-public class RecordPositionMapConverter implements
Converter<Map<RecordPartition, RecordOffset>> {
+public class RecordPositionMapConverter implements
Converter<Map<ExtendRecordPartition, RecordOffset>> {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@Override
- public byte[] objectToByte(Map<RecordPartition, RecordOffset> map) {
+ public byte[] objectToByte(Map<ExtendRecordPartition, RecordOffset> map) {
try {
Map<String, String> resultMap = new HashMap<>();
- for (Map.Entry<RecordPartition, RecordOffset> entry :
map.entrySet()) {
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> entry :
map.entrySet()) {
String jsonKey = JSON.toJSONString(entry.getKey());
jsonKey.getBytes("UTF-8");
String jsonValue = JSON.toJSONString(entry.getValue());
@@ -56,14 +56,14 @@ public class RecordPositionMapConverter implements
Converter<Map<RecordPartition
}
@Override
- public Map<RecordPartition, RecordOffset> byteToObject(byte[] bytes) {
+ public Map<ExtendRecordPartition, RecordOffset> byteToObject(byte[] bytes)
{
- Map<RecordPartition, RecordOffset> resultMap = new HashMap<>();
+ Map<ExtendRecordPartition, RecordOffset> resultMap = new HashMap<>();
try {
String rawString = new String(bytes, "UTF-8");
Map<String, String> map = JSON.parseObject(rawString, Map.class);
for (String key : map.keySet()) {
- RecordPartition recordPartition = JSON.parseObject(key,
RecordPartition.class);
+ ExtendRecordPartition recordPartition = JSON.parseObject(key,
ExtendRecordPartition.class);
RecordOffset recordOffset = JSON.parseObject(map.get(key),
RecordOffset.class);
resultMap.put(recordPartition, recordOffset);
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b2a3a5b..58e4e7e 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -25,7 +25,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
@@ -40,13 +41,13 @@ public class RestHandler {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- private final ConnectController connectController;
+ private final AbstractConnectController connectController;
private static final String CONNECTOR_CONFIGS = "connectorConfigs";
private static final String TASK_CONFIGS = "taskConfigs";
- public RestHandler(ConnectController connectController) {
+ public RestHandler(AbstractConnectController connectController) {
this.connectController = connectController;
Javalin app = Javalin.create();
app.enableCaseSensitiveUrls();
@@ -74,9 +75,7 @@ public class RestHandler {
private void getAllocatedConnectors(Context context) {
-
Set<WorkerConnector> workerConnectors =
connectController.getWorker().getWorkingConnectors();
- Set<Runnable> workerTasks =
connectController.getWorker().getWorkingTasks();
Map<String, ConnectKeyValue> connectors = new HashMap<>();
for (WorkerConnector workerConnector : workerConnectors) {
connectors.put(workerConnector.getConnectorName(),
workerConnector.getKeyValue());
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 0b0a1fe..9a07d56 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.connect.runtime.service;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+
import java.util.List;
/**
@@ -36,6 +38,15 @@ public interface ClusterManagementService {
*/
void stop();
+ /**
+ * Configure class with the given key-value pairs
+ *
+ * @param config can be DistributedConfig or StandaloneConfig
+ */
+ default void configure(ConnectConfig config) {
+
+ }
+
/**
* Check if Cluster Store Topic exists.
*
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 59c697b..5d6d6c9 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -18,9 +18,12 @@
package org.apache.rocketmq.connect.runtime.service;
import io.openmessaging.connector.api.component.connector.Connector;
+
import java.util.List;
import java.util.Map;
+
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
/**
@@ -29,6 +32,7 @@ import org.apache.rocketmq.connect.runtime.utils.Plugin;
*/
public interface ConfigManagementService {
+
/**
* Start the config manager.
*/
@@ -39,6 +43,15 @@ public interface ConfigManagementService {
*/
void stop();
+ /**
+ * Configure class with the given key-value pairs
+ *
+ * @param config can be DistributedConfig or StandaloneConfig
+ */
+ default void configure(ConnectConfig config) {
+
+ }
+
/**
* Get all connector configs from the cluster filter out DELETE set to 1
*
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
index 8dabdfe..d6254cd 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
@@ -20,7 +20,8 @@ package org.apache.rocketmq.connect.runtime.service;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.connector.ConnectorContext;
import java.util.Set;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -32,11 +33,11 @@ public class DefaultConnectorContext implements
ConnectorContext {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
- private final ConnectController controller;
+ private final AbstractConnectController controller;
private final String connectorName;
- public DefaultConnectorContext(String connectorName, ConnectController
connectController) {
+ public DefaultConnectorContext(String connectorName,
AbstractConnectController connectController) {
this.controller = connectController;
this.connectorName = connectorName;
}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 792b24b..49860be 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.connect.runtime.service;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -32,6 +33,7 @@ import
org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
import
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -48,18 +50,18 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
/**
* Current offset info in store.
*/
- private KeyValueStore<RecordPartition, RecordOffset> offsetStore;
+ private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
/**
* The updated partition of the task in the current instance.
*/
- private Set<RecordPartition> needSyncPartition;
+ private Set<ExtendRecordPartition> needSyncPartition;
/**
* Synchronize data with other workers.
*/
- private DataSynchronizer<String, Map<RecordPartition, RecordOffset>>
dataSynchronizer;
+ private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>>
dataSynchronizer;
private final String offsetManagePrefix = "OffsetManage";
@@ -71,14 +73,14 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
public OffsetManagementServiceImpl(ConnectConfig connectConfig) {
this.offsetStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
this.dataSynchronizer = new BrokerBasedLog(connectConfig,
- connectConfig.getOffsetStoreTopic(),
- ConnectUtil.createGroupName(offsetManagePrefix,
connectConfig.getWorkerId()),
- new OffsetChangeCallback(),
- new JsonConverter(),
- new RecordPositionMapConverter());
+ connectConfig.getOffsetStoreTopic(),
+ ConnectUtil.createGroupName(offsetManagePrefix,
connectConfig.getWorkerId()),
+ new OffsetChangeCallback(),
+ new JsonConverter(),
+ new RecordPositionMapConverter());
this.offsetUpdateListener = new HashSet<>();
this.needSyncPartition = new ConcurrentSet<>();
this.prepare(connectConfig);
@@ -120,7 +122,8 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
offsetStore.persist();
}
- @Override public void load() {
+ @Override
+ public void load() {
offsetStore.load();
}
@@ -131,38 +134,38 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
}
@Override
- public Map<RecordPartition, RecordOffset> getPositionTable() {
+ public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
return offsetStore.getKVMap();
}
@Override
- public RecordOffset getPosition(RecordPartition partition) {
+ public RecordOffset getPosition(ExtendRecordPartition partition) {
return offsetStore.get(partition);
}
@Override
- public void putPosition(Map<RecordPartition, RecordOffset> offsets) {
+ public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
offsetStore.putAll(offsets);
needSyncPartition.addAll(offsets.keySet());
}
@Override
- public void putPosition(RecordPartition partition, RecordOffset position) {
+ public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
offsetStore.put(partition, position);
needSyncPartition.add(partition);
}
@Override
- public void removePosition(List<RecordPartition> offsets) {
+ public void removePosition(List<ExtendRecordPartition> offsets) {
if (null == offsets) {
return;
}
- for (RecordPartition offset : offsets) {
+ for (ExtendRecordPartition offset : offsets) {
needSyncPartition.remove(offset);
offsetStore.remove(offset);
}
@@ -182,11 +185,11 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
private void sendNeedSynchronizeOffset() {
- Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+ Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
needSyncPartition = new ConcurrentSet<>();
- Map<RecordPartition, RecordOffset> needSyncOffset =
offsetStore.getKVMap().entrySet().stream()
- .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
- .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
+ Map<ExtendRecordPartition, RecordOffset> needSyncOffset =
offsetStore.getKVMap().entrySet().stream()
+ .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(),
needSyncOffset);
}
@@ -196,10 +199,10 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
dataSynchronizer.send(OffsetChangeEnum.OFFSET_CHANG_KEY.name(),
offsetStore.getKVMap());
}
- private class OffsetChangeCallback implements
DataSynchronizerCallback<String, Map<RecordPartition, RecordOffset>> {
+ private class OffsetChangeCallback implements
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
@Override
- public void onCompletion(Throwable error, String key,
Map<RecordPartition, RecordOffset> result) {
+ public void onCompletion(Throwable error, String key,
Map<ExtendRecordPartition, RecordOffset> result) {
boolean changed = false;
switch (OffsetChangeEnum.valueOf(key)) {
@@ -232,16 +235,16 @@ public class OffsetManagementServiceImpl implements
PositionManagementService {
* @param result
* @return
*/
- private boolean mergeOffsetInfo(Map<RecordPartition, RecordOffset> result)
{
+ private boolean mergeOffsetInfo(Map<ExtendRecordPartition, RecordOffset>
result) {
boolean changed = false;
if (null == result || 0 == result.size()) {
return changed;
}
- for (Map.Entry<RecordPartition, RecordOffset> newEntry :
result.entrySet()) {
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry :
result.entrySet()) {
boolean find = false;
- for (Map.Entry<RecordPartition, RecordOffset> existedEntry :
offsetStore.getKVMap().entrySet()) {
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry :
offsetStore.getKVMap().entrySet()) {
if (newEntry.getKey().equals(existedEntry.getKey())) {
find = true;
if (!newEntry.getValue().equals(existedEntry.getValue())) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
index 8f01605..bd3faa7 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementService.java
@@ -19,7 +19,9 @@
package org.apache.rocketmq.connect.runtime.service;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+
import java.util.List;
import java.util.Map;
@@ -38,6 +40,15 @@ public interface PositionManagementService {
*/
void stop();
+ /**
+ * Configure class with the given key-value pairs
+ *
+ * @param config can be DistributedConfig or StandaloneConfig
+ */
+ default void configure(ConnectConfig config) {
+
+ }
+
/**
* Persist position info in a persist store.
*/
@@ -50,7 +61,7 @@ public interface PositionManagementService {
/**
* Synchronize to other nodes.
- * */
+ */
void synchronize();
/**
@@ -58,23 +69,23 @@ public interface PositionManagementService {
*
* @return
*/
- Map<RecordPartition, RecordOffset> getPositionTable();
+ Map<ExtendRecordPartition, RecordOffset> getPositionTable();
- RecordOffset getPosition(RecordPartition partition);
+ RecordOffset getPosition(ExtendRecordPartition partition);
/**
* Put a position info.
*/
- void putPosition(Map<RecordPartition, RecordOffset> positions);
+ void putPosition(Map<ExtendRecordPartition, RecordOffset> positions);
- void putPosition(RecordPartition partition, RecordOffset position);
+ void putPosition(ExtendRecordPartition partition, RecordOffset position);
/**
* Remove a position info.
*
* @param partitions
*/
- void removePosition(List<RecordPartition> partitions);
+ void removePosition(List<ExtendRecordPartition> partitions);
/**
* Register a listener.
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index 2c2c217..893809e 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.connect.runtime.service;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
+
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -32,6 +33,7 @@ import
org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
import
org.apache.rocketmq.connect.runtime.converter.RecordPositionMapConverter;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
@@ -48,17 +50,17 @@ public class PositionManagementServiceImpl implements
PositionManagementService
/**
* Current position info in store.
*/
- private KeyValueStore<RecordPartition, RecordOffset> positionStore;
+ private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
/**
* The updated partition of the task in the current instance.
- * */
- private Set<RecordPartition> needSyncPartition;
+ */
+ private Set<ExtendRecordPartition> needSyncPartition;
/**
* Synchronize data with other workers.
*/
- private DataSynchronizer<String, Map<RecordPartition, RecordOffset>>
dataSynchronizer;
+ private DataSynchronizer<String, Map<ExtendRecordPartition, RecordOffset>>
dataSynchronizer;
/**
* Listeners.
@@ -70,14 +72,14 @@ public class PositionManagementServiceImpl implements
PositionManagementService
public PositionManagementServiceImpl(ConnectConfig connectConfig) {
this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
- new RecordPartitionConverter(),
- new RecordOffsetConverter());
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
this.dataSynchronizer = new BrokerBasedLog(connectConfig,
- connectConfig.getPositionStoreTopic(),
- ConnectUtil.createGroupName(positionManagePrefix,
connectConfig.getWorkerId()),
- new PositionChangeCallback(),
- new JsonConverter(),
- new RecordPositionMapConverter());
+ connectConfig.getPositionStoreTopic(),
+ ConnectUtil.createGroupName(positionManagePrefix,
connectConfig.getWorkerId()),
+ new PositionChangeCallback(),
+ new JsonConverter(),
+ new RecordPositionMapConverter());
this.positionUpdateListener = new HashSet<>();
this.needSyncPartition = new ConcurrentSet<>();
this.prepare(connectConfig);
@@ -119,7 +121,8 @@ public class PositionManagementServiceImpl implements
PositionManagementService
positionStore.persist();
}
- @Override public void load() {
+ @Override
+ public void load() {
positionStore.load();
}
@@ -130,39 +133,39 @@ public class PositionManagementServiceImpl implements
PositionManagementService
}
@Override
- public Map<RecordPartition, RecordOffset> getPositionTable() {
+ public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
return positionStore.getKVMap();
}
@Override
- public RecordOffset getPosition(RecordPartition partition) {
+ public RecordOffset getPosition(ExtendRecordPartition partition) {
return positionStore.get(partition);
}
@Override
- public void putPosition(Map<RecordPartition, RecordOffset> positions) {
+ public void putPosition(Map<ExtendRecordPartition, RecordOffset>
positions) {
positionStore.putAll(positions);
needSyncPartition.addAll(positions.keySet());
}
@Override
- public void putPosition(RecordPartition partition, RecordOffset position) {
+ public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
positionStore.put(partition, position);
needSyncPartition.add(partition);
}
@Override
- public void removePosition(List<RecordPartition> partitions) {
+ public void removePosition(List<ExtendRecordPartition> partitions) {
if (null == partitions) {
return;
}
- for (RecordPartition partition : partitions) {
+ for (ExtendRecordPartition partition : partitions) {
needSyncPartition.remove(partition);
positionStore.remove(partition);
}
@@ -182,11 +185,11 @@ public class PositionManagementServiceImpl implements
PositionManagementService
private void sendNeedSynchronizePosition() {
- Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+ Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
needSyncPartition = new ConcurrentSet<>();
- Map<RecordPartition, RecordOffset> needSyncPosition =
positionStore.getKVMap().entrySet().stream()
- .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
- .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
+ Map<ExtendRecordPartition, RecordOffset> needSyncPosition =
positionStore.getKVMap().entrySet().stream()
+ .filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
dataSynchronizer.send(PositionChangeEnum.POSITION_CHANG_KEY.name(),
needSyncPosition);
}
@@ -196,10 +199,10 @@ public class PositionManagementServiceImpl implements
PositionManagementService
dataSynchronizer.send(PositionChangeEnum.POSITION_CHANG_KEY.name(),
positionStore.getKVMap());
}
- private class PositionChangeCallback implements
DataSynchronizerCallback<String, Map<RecordPartition, RecordOffset>> {
+ private class PositionChangeCallback implements
DataSynchronizerCallback<String, Map<ExtendRecordPartition, RecordOffset>> {
@Override
- public void onCompletion(Throwable error, String key,
Map<RecordPartition, RecordOffset> result) {
+ public void onCompletion(Throwable error, String key,
Map<ExtendRecordPartition, RecordOffset> result) {
boolean changed = false;
switch (PositionChangeEnum.valueOf(key)) {
@@ -232,16 +235,16 @@ public class PositionManagementServiceImpl implements
PositionManagementService
* @param result
* @return
*/
- private boolean mergePositionInfo(Map<RecordPartition, RecordOffset>
result) {
+ private boolean mergePositionInfo(Map<ExtendRecordPartition, RecordOffset>
result) {
boolean changed = false;
if (null == result || 0 == result.size()) {
return changed;
}
- for (Map.Entry<RecordPartition, RecordOffset> newEntry :
result.entrySet()) {
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> newEntry :
result.entrySet()) {
boolean find = false;
- for (Map.Entry<RecordPartition, RecordOffset> existedEntry :
positionStore.getKVMap().entrySet()) {
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> existedEntry :
positionStore.getKVMap().entrySet()) {
if (newEntry.getKey().equals(existedEntry.getKey())) {
find = true;
if (!newEntry.getValue().equals(existedEntry.getValue())) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 8b1ed6d..7d8cbad 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -19,13 +19,15 @@ package org.apache.rocketmq.connect.runtime.service;
import java.util.List;
import java.util.Map;
+
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import
org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import
org.apache.rocketmq.connect.runtime.service.memory.MemoryClusterManagementServiceImpl;
import
org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.slf4j.Logger;
@@ -58,10 +60,10 @@ public class RebalanceImpl {
*/
private AllocateConnAndTaskStrategy allocateConnAndTaskStrategy;
- private final ConnectController connectController;
+ private final AbstractConnectController connectController;
- public RebalanceImpl(Worker worker, ConfigManagementService
configManagementService,
- ClusterManagementService clusterManagementService,
AllocateConnAndTaskStrategy strategy, ConnectController connectController) {
+ public RebalanceImpl(Worker worker,
+ ConfigManagementService configManagementService,
ClusterManagementService clusterManagementService, AllocateConnAndTaskStrategy
strategy, AbstractConnectController connectController) {
this.worker = worker;
this.configManagementService = configManagementService;
@@ -86,8 +88,14 @@ public class RebalanceImpl {
public void doRebalance() {
List<String> curAliveWorkers =
clusterManagementService.getAllAliveWorkers();
if (curAliveWorkers != null) {
- log.info("Current Alive workers : " + curAliveWorkers.size());
+ if (clusterManagementService instanceof
ClusterManagementServiceImpl) {
+ log.info("Current Alive workers : " + curAliveWorkers.size());
+ } else if (clusterManagementService instanceof
MemoryClusterManagementServiceImpl) {
+ log.info("Current alive worker : " +
curAliveWorkers.iterator().next());
+ }
}
+
+
Map<String, ConnectKeyValue> curConnectorConfigs =
configManagementService.getConnectorConfigs();
log.info("Current ConnectorConfigs : " + curConnectorConfigs);
Map<String, List<ConnectKeyValue>> curTaskConfigs =
configManagementService.getTaskConfigs();
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
new file mode 100644
index 0000000..f445d58
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FileOffsetManagementServiceImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
+import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * memory offset management service impl
+ */
+public class FileOffsetManagementServiceImpl implements
PositionManagementService {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ protected ExecutorService executor;
+
+ /**
+ * Current offset info in store.
+ */
+ private KeyValueStore<ExtendRecordPartition, RecordOffset> offsetStore;
+
+
+ /**
+ * Listeners.
+ */
+ private PositionUpdateListener offsetUpdateListener;
+
+ public FileOffsetManagementServiceImpl(ConnectConfig connectConfig) {
+ this.offsetStore = new FileBaseKeyValueStore<>(
+
FilePathConfigUtil.getOffsetPath(connectConfig.getStorePathRootDir()),
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
+ }
+
+ @Override
+ public void start() {
+ executor = Executors.newFixedThreadPool(1,
ThreadUtils.newThreadFactory(
+ this.getClass().getSimpleName() + "-%d", false));
+ offsetStore.load();
+ }
+
+ @Override
+ public void stop() {
+ offsetStore.persist();
+ if (executor != null) {
+ executor.shutdown();
+ // Best effort wait for any get() and set() tasks (and caller's
callbacks) to complete.
+ try {
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (!executor.shutdownNow().isEmpty()) {
+ throw new ConnectException("Failed to stop
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
+ "shutting down pending tasks and/or callbacks.");
+ }
+ executor = null;
+ }
+ }
+
+ @Override
+ public void persist() {
+ offsetStore.persist();
+ }
+
+ @Override
+ public void load() {
+ offsetStore.load();
+ }
+
+ @Override
+ public void synchronize() {
+ }
+
+ @Override
+ public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
+ return offsetStore.getKVMap();
+ }
+
+ @Override
+ public RecordOffset getPosition(ExtendRecordPartition partition) {
+ return offsetStore.get(partition);
+ }
+
+ @Override
+ public void putPosition(Map<ExtendRecordPartition, RecordOffset> offsets) {
+ offsetStore.putAll(offsets);
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist offsets to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist offsets to storage: {}",
offsets);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
+ offsetStore.put(partition, position);
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist offsets to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist offsets to storage: {},
{}", partition, position);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void removePosition(List<ExtendRecordPartition> offsets) {
+ if (null == offsets) {
+ return;
+ }
+ for (ExtendRecordPartition offset : offsets) {
+ offsetStore.remove(offset);
+ }
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist offsets to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist offsets to storage: {}",
offsets);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void registerListener(PositionUpdateListener listener) {
+ this.offsetUpdateListener = listener;
+ }
+
+
+ private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void>
callback) {
+ if (offsetUpdateListener != null) {
+ offsetUpdateListener.onPositionUpdate();
+ }
+
+ return executor.submit(new Callable<Void>() {
+ /**
+ * Computes a result, or throws an exception if unable to do so.
+ *
+ * @return computed result
+ * @throws Exception if unable to compute a result
+ */
+ @Override
+ public Void call() {
+ try {
+ offsetStore.persist();
+ if (callback != null) {
+ callback.onCompletion(null, null, null);
+ }
+ } catch (Exception error) {
+ callback.onCompletion(error, null, null);
+ }
+ return null;
+ }
+ });
+ }
+
+}
+
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
new file mode 100644
index 0000000..c0ebb2c
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
+import org.apache.rocketmq.connect.runtime.converter.RecordPartitionConverter;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import
org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * standalone
+ */
+public class FilePositionManagementServiceImpl implements
PositionManagementService {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ protected ExecutorService executor;
+ /**
+ * Current position info in store.
+ */
+ private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
+ /**
+ * Listeners.
+ */
+ private PositionUpdateListener positionUpdateListener;
+
+
+ public FilePositionManagementServiceImpl(ConnectConfig connectConfig) {
+ this.positionStore = new
FileBaseKeyValueStore<>(FilePathConfigUtil.getPositionPath(connectConfig.getStorePathRootDir()),
+ new RecordPartitionConverter(),
+ new RecordOffsetConverter());
+ }
+
+
+ @Override
+ public void start() {
+ executor = Executors.newFixedThreadPool(1,
ThreadUtils.newThreadFactory(
+ this.getClass().getSimpleName() + "-%d", false));
+ positionStore.load();
+ }
+
+ @Override
+ public void stop() {
+ positionStore.persist();
+ if (executor != null) {
+ executor.shutdown();
+ // Best effort wait for any get() and set() tasks (and caller's
callbacks) to complete.
+ try {
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (!executor.shutdownNow().isEmpty()) {
+ throw new ConnectException("Failed to stop
MemoryOffsetManagementServiceImpl. Exiting without cleanly " +
+ "shutting down pending tasks and/or callbacks.");
+ }
+ executor = null;
+ }
+ }
+
+ @Override
+ public void persist() {
+ positionStore.persist();
+ }
+
+ @Override
+ public void load() {
+ positionStore.load();
+ }
+
+ @Override
+ public void synchronize() {
+ }
+
+ @Override
+ public Map<ExtendRecordPartition, RecordOffset> getPositionTable() {
+ return positionStore.getKVMap();
+ }
+
+ @Override
+ public RecordOffset getPosition(ExtendRecordPartition partition) {
+ return positionStore.get(partition);
+ }
+
+ @Override
+ public void putPosition(Map<ExtendRecordPartition, RecordOffset>
positions) {
+ positionStore.putAll(positions);
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist positions to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist positions to storage: {}
", positions);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void putPosition(ExtendRecordPartition partition, RecordOffset
position) {
+ positionStore.put(partition, position);
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist positions to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist positions to storage: {} ,
{} ", partition, position);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void removePosition(List<ExtendRecordPartition> partitions) {
+ if (null == partitions) {
+ return;
+ }
+ for (ExtendRecordPartition partition : partitions) {
+ positionStore.remove(partition);
+ }
+ this.triggerListener(new DataSynchronizerCallback<Void, Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void key, Void result) {
+ if (error != null) {
+ log.error("Failed to persist positions to storage: {}",
error);
+ } else {
+ log.trace("Successed to persist positions to storage: {}",
partitions);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void registerListener(PositionUpdateListener listener) {
+ this.positionUpdateListener = listener;
+ }
+
+ private Future<Void> triggerListener(DataSynchronizerCallback<Void, Void>
callback) {
+ if (this.positionUpdateListener != null) {
+ positionUpdateListener.onPositionUpdate();
+ }
+
+ return executor.submit(new Callable<Void>() {
+ /**
+ * Computes a result, or throws an exception if unable to do so.
+ *
+ * @return computed result
+ * @throws Exception if unable to compute a result
+ */
+ @Override
+ public Void call() {
+ try {
+ positionStore.persist();
+ if (callback != null) {
+ callback.onCompletion(null, null, null);
+ }
+ } catch (Exception error) {
+ callback.onCompletion(error, null, null);
+ }
+ return null;
+ }
+ });
+ }
+
+
+}
+
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
new file mode 100644
index 0000000..5e14d70
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryClusterManagementServiceImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import
org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * standalone cluster management service
+ */
+public class MemoryClusterManagementServiceImpl implements
ClusterManagementService {
+
+ private StandaloneConfig config;
+
+ public MemoryClusterManagementServiceImpl(ConnectConfig config) {
+ this.configure(config);
+ }
+
+ /**
+ * Configure class with the given key-value pairs
+ *
+ * @param config can be DistributedConfig or StandaloneConfig
+ */
+ @Override
+ public void configure(ConnectConfig config) {
+ this.config = (StandaloneConfig) config;
+ }
+
+ /**
+ * Start the cluster manager.
+ */
+ @Override
+ public void start() {
+
+ }
+
+ /**
+ * Stop the cluster manager.
+ */
+ @Override
+ public void stop() {
+
+ }
+
+ /**
+ * Check if Cluster Store Topic exists.
+ *
+ * @return true if Cluster Store Topic exists, otherwise return false.
+ */
+ @Override
+ public boolean hasClusterStoreTopic() {
+ return false;
+ }
+
+ /**
+ * Get all alive workers in the cluster.
+ *
+ * @return
+ */
+ @Override
+ public List<String> getAllAliveWorkers() {
+ return Collections.singletonList(this.config.getWorkerId());
+ }
+
+ /**
+ * Register a worker status listener to listen the change of alive workers.
+ *
+ * @param listener
+ */
+ @Override
+ public void registerListener(WorkerStatusListener listener) {
+ }
+
+ @Override
+ public String getCurrentWorker() {
+ return this.config.getWorkerId();
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
new file mode 100644
index 0000000..f0736c4
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.Connector;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
+import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
+import org.apache.rocketmq.connect.runtime.converter.ListConverter;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
+import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * memory config management service impl for standalone
+ */
+public class MemoryConfigManagementServiceImpl implements
ConfigManagementService {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ /**
+ * Current connector configs in the store.
+ */
+ private KeyValueStore<String, ConnectKeyValue> connectorKeyValueStore;
+
+ /**
+ * Current task configs in the store.
+ */
+ private KeyValueStore<String, List<ConnectKeyValue>> taskKeyValueStore;
+
+ /**
+ * All listeners to trigger while config change.
+ */
+ private ConnectorConfigUpdateListener connectorConfigUpdateListener;
+
+ private final Plugin plugin;
+
+ public MemoryConfigManagementServiceImpl(ConnectConfig connectConfig,
Plugin plugin) {
+
+ this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
+
FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new JsonConverter(ConnectKeyValue.class));
+ this.taskKeyValueStore = new FileBaseKeyValueStore<>(
+
FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
+ new JsonConverter(),
+ new ListConverter(ConnectKeyValue.class));
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void start() {
+ connectorKeyValueStore.load();
+ taskKeyValueStore.load();
+ }
+
+ @Override
+ public void stop() {
+ connectorKeyValueStore.persist();
+ taskKeyValueStore.persist();
+ }
+
+ /**
+ * get all connector configs enabled
+ *
+ * @return
+ */
+ @Override
+ public Map<String, ConnectKeyValue> getConnectorConfigs() {
+ Map<String, ConnectKeyValue> result = new HashMap<>();
+ Map<String, ConnectKeyValue> connectorConfigs =
connectorKeyValueStore.getKVMap();
+ for (String connectorName : connectorConfigs.keySet()) {
+ ConnectKeyValue config = connectorConfigs.get(connectorName);
+ if (0 != config.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+ continue;
+ }
+ result.put(connectorName, config);
+ }
+ return result;
+ }
+
+ /**
+ * get all connector configs include deleted
+ *
+ * @return
+ */
+ @Override
+ public Map<String, ConnectKeyValue> getConnectorConfigsIncludeDeleted() {
+ Map<String, ConnectKeyValue> result = new HashMap<>();
+ Map<String, ConnectKeyValue> connectorConfigs =
connectorKeyValueStore.getKVMap();
+ for (String connectorName : connectorConfigs.keySet()) {
+ ConnectKeyValue config = connectorConfigs.get(connectorName);
+ result.put(connectorName, config);
+ }
+ return result;
+ }
+
+ @Override
+ public String putConnectorConfig(String connectorName, ConnectKeyValue
configs) throws Exception {
+ ConnectKeyValue exist = connectorKeyValueStore.get(connectorName);
+ if (null != exist) {
+ Long updateTimestamp =
exist.getLong(RuntimeConfigDefine.UPDATE_TIMESTAMP);
+ if (null != updateTimestamp) {
+ configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
updateTimestamp);
+ }
+ }
+ if (configs.equals(exist)) {
+ return "Connector with same config already exist.";
+ }
+
+ Long currentTimestamp = System.currentTimeMillis();
+ configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+ for (String requireConfig : RuntimeConfigDefine.REQUEST_CONFIG) {
+ if (!configs.containsKey(requireConfig)) {
+ return "Request config key: " + requireConfig;
+ }
+ }
+
+ String connectorClass =
configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+ ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
+ Class clazz;
+ if (null != classLoader) {
+ clazz = Class.forName(connectorClass, true, classLoader);
+ } else {
+ clazz = Class.forName(connectorClass);
+ }
+ final Connector connector = (Connector)
clazz.getDeclaredConstructor().newInstance();
+ connector.validate(configs);
+ connector.init(configs);
+ connectorKeyValueStore.put(connectorName, configs);
+ recomputeTaskConfigs(connectorName, connector, currentTimestamp,
configs);
+ return "";
+ }
+
+ @Override
+ public void recomputeTaskConfigs(String connectorName, Connector
connector, Long currentTimestamp, ConnectKeyValue configs) {
+ int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+ ConnectKeyValue connectConfig =
connectorKeyValueStore.get(connectorName);
+ boolean directEnable =
Boolean.parseBoolean(connectConfig.getString(RuntimeConfigDefine.CONNECTOR_DIRECT_ENABLE));
+ List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
+ List<ConnectKeyValue> converterdConfigs = new ArrayList<>();
+ for (KeyValue keyValue : taskConfigs) {
+ ConnectKeyValue newKeyValue = new ConnectKeyValue();
+ for (String key : keyValue.keySet()) {
+ newKeyValue.put(key, keyValue.getString(key));
+ }
+ if (directEnable) {
+ newKeyValue.put(RuntimeConfigDefine.TASK_TYPE,
Worker.TaskType.DIRECT.name());
+ newKeyValue.put(RuntimeConfigDefine.SOURCE_TASK_CLASS,
connectConfig.getString(RuntimeConfigDefine.SOURCE_TASK_CLASS));
+ newKeyValue.put(RuntimeConfigDefine.SINK_TASK_CLASS,
connectConfig.getString(RuntimeConfigDefine.SINK_TASK_CLASS));
+ }
+ newKeyValue.put(RuntimeConfigDefine.TASK_CLASS,
connector.taskClass().getName());
+ newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
currentTimestamp);
+
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME,
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+ newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES,
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+ Set<String> connectConfigKeySet = configs.keySet();
+ for (String connectConfigKey : connectConfigKeySet) {
+ if
(connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+ newKeyValue.put(connectConfigKey,
configs.getString(connectConfigKey));
+ }
+ }
+ converterdConfigs.add(newKeyValue);
+ }
+ putTaskConfigs(connectorName, converterdConfigs);
+ triggerListener();
+ }
+
+ @Override
+ public void removeConnectorConfig(String connectorName) {
+ ConnectKeyValue config = connectorKeyValueStore.get(connectorName);
+ config.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
System.currentTimeMillis());
+ config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
+ List<ConnectKeyValue> taskConfigList =
taskKeyValueStore.get(connectorName);
+ taskConfigList.add(config);
+ connectorKeyValueStore.put(connectorName, config);
+ putTaskConfigs(connectorName, taskConfigList);
+ log.info("[ISSUE #2027] After removal The configs are:\n" +
getConnectorConfigs().toString());
+ triggerListener();
+ }
+
+ @Override
+ public Map<String, List<ConnectKeyValue>> getTaskConfigs() {
+ Map<String, List<ConnectKeyValue>> result = new HashMap<>();
+ Map<String, List<ConnectKeyValue>> taskConfigs =
taskKeyValueStore.getKVMap();
+ Map<String, ConnectKeyValue> filteredConnector = getConnectorConfigs();
+ for (String connectorName : taskConfigs.keySet()) {
+ if (!filteredConnector.containsKey(connectorName)) {
+ continue;
+ }
+ result.put(connectorName, taskConfigs.get(connectorName));
+ }
+ return result;
+ }
+
+ private void putTaskConfigs(String connectorName, List<ConnectKeyValue>
configs) {
+ List<ConnectKeyValue> exist = taskKeyValueStore.get(connectorName);
+ if (null != exist && exist.size() > 0) {
+ taskKeyValueStore.remove(connectorName);
+ }
+ taskKeyValueStore.put(connectorName, configs);
+ }
+
+ @Override
+ public void persist() {
+ this.connectorKeyValueStore.persist();
+ this.taskKeyValueStore.persist();
+ }
+
+ @Override
+ public void registerListener(ConnectorConfigUpdateListener listener) {
+ this.connectorConfigUpdateListener = listener;
+ }
+
+ private void triggerListener() {
+ if (null == this.connectorConfigUpdateListener) {
+ return;
+ }
+ connectorConfigUpdateListener.onConfigUpdate();
+ }
+
+ @Override
+ public Plugin getPlugin() {
+ return this.plugin;
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
new file mode 100644
index 0000000..dbe39e2
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
+import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * standalone rebalance service
+ */
+public class StandaloneRebalanceService extends ServiceThread {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ private static long waitInterval =
Long.parseLong(System.getProperty("rocketmq.runtime.cluster.rebalance.waitInterval",
"20000"));
+
+ /**
+ * Assign all connectors and tasks to all alive process in the cluster.
+ */
+ private final RebalanceImpl rebalanceImpl;
+
+ /**
+ * ConfigManagementService to access current config info.
+ */
+ private final ConfigManagementService configManagementService;
+
+ /**
+ * ClusterManagementService to access current cluster info.
+ */
+ private final ClusterManagementService clusterManagementService;
+
+ public StandaloneRebalanceService(RebalanceImpl rebalanceImpl,
ConfigManagementService configManagementService,
+ ClusterManagementService
clusterManagementService) {
+ this.rebalanceImpl = rebalanceImpl;
+ this.configManagementService = configManagementService;
+ this.clusterManagementService = clusterManagementService;
+ this.configManagementService.registerListener(new
ConnectorConnectorConfigChangeListenerImpl());
+ }
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(waitInterval);
+ this.rebalanceImpl.doRebalance();
+ } catch (Exception e) {
+ log.error("reblacne exception", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return StandaloneRebalanceService.class.getSimpleName();
+ }
+
+ class ConnectorConnectorConfigChangeListenerImpl implements
ConfigManagementService.ConnectorConfigUpdateListener {
+
+ /**
+ * When config change.
+ */
+ @Override
+ public void onConfigUpdate() {
+ StandaloneRebalanceService.this.wakeup();
+ }
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
index 029a35b..a61982d 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
*/
public class DefaultAllocateConnAndTaskStrategy implements
AllocateConnAndTaskStrategy {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
-
+
@Override
public ConnAndTaskConfigs allocate(List<String> allWorker, String
curWorker,
Map<String, ConnectKeyValue> connectorConfigs,
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
new file mode 100644
index 0000000..6190a9d
--- /dev/null
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/ExtendRecordPartition.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.store;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * extend record partition
+ */
+public class ExtendRecordPartition extends RecordPartition {
+ /**
+ * connect name
+ */
+ private String namespace;
+
+ public ExtendRecordPartition(String namespace, Map<String, ?> partition) {
+ super(partition);
+ this.namespace = namespace;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ExtendRecordPartition)) return false;
+ if (!super.equals(o)) return false;
+ ExtendRecordPartition that = (ExtendRecordPartition) o;
+ return this.namespace.equals(that.namespace);
+ }
+
+ @Override
+ public String toString() {
+ return "ExtendRecordPartition{" +
+ "namespace='" + namespace + '\'' +
+ "} " + super.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), namespace);
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
index 84c5365..09980df 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
@@ -29,22 +29,25 @@ import
org.apache.rocketmq.connect.runtime.service.PositionManagementService;
public class PositionStorageReaderImpl implements OffsetStorageReader {
private PositionManagementService positionManagementService;
+ private final String namespace;
- public PositionStorageReaderImpl(PositionManagementService
positionManagementService) {
-
+ public PositionStorageReaderImpl(String namespace,
PositionManagementService positionManagementService) {
+ this.namespace = namespace;
this.positionManagementService = positionManagementService;
}
@Override public RecordOffset readOffset(RecordPartition partition) {
- return positionManagementService.getPositionTable().get(partition);
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition(namespace, partition.getPartition());
+ return
positionManagementService.getPositionTable().get(extendRecordPartition);
}
@Override public Map<RecordPartition, RecordOffset>
readOffsets(Collection<RecordPartition> partitions) {
Map<RecordPartition, RecordOffset> result = new HashMap<>();
- Map<RecordPartition, RecordOffset> allData =
positionManagementService.getPositionTable();
+ Map<ExtendRecordPartition, RecordOffset> allData =
positionManagementService.getPositionTable();
for (RecordPartition key : partitions) {
- if (allData.containsKey(key)) {
- result.put(key, allData.get(key));
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition(namespace, key.getPartition());
+ if (allData.containsKey(extendRecordPartition)) {
+ result.put(key, allData.get(extendRecordPartition));
}
}
return result;
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
similarity index 56%
copy from
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
copy to
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
index 84c5365..c5fa1fa 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageReaderImpl.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
@@ -18,35 +18,26 @@
package org.apache.rocketmq.connect.runtime.store;
+
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.storage.OffsetStorageReader;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
-public class PositionStorageReaderImpl implements OffsetStorageReader {
+/**
+ * position storage writer
+ */
+public class PositionStorageWriter {
private PositionManagementService positionManagementService;
+ private final String namespace;
- public PositionStorageReaderImpl(PositionManagementService
positionManagementService) {
-
+ public PositionStorageWriter(String namespace, PositionManagementService
positionManagementService) {
+ this.namespace = namespace;
this.positionManagementService = positionManagementService;
}
- @Override public RecordOffset readOffset(RecordPartition partition) {
- return positionManagementService.getPositionTable().get(partition);
- }
-
- @Override public Map<RecordPartition, RecordOffset>
readOffsets(Collection<RecordPartition> partitions) {
- Map<RecordPartition, RecordOffset> result = new HashMap<>();
- Map<RecordPartition, RecordOffset> allData =
positionManagementService.getPositionTable();
- for (RecordPartition key : partitions) {
- if (allData.containsKey(key)) {
- result.put(key, allData.get(key));
- }
- }
- return result;
+ public void putPosition(RecordPartition partition, RecordOffset position) {
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition(namespace, partition.getPartition());
+ positionManagementService.putPosition(extendRecordPartition, position);
}
}
diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf
b/rocketmq-connect-runtime/src/main/resources/connect-distributed.conf
similarity index 100%
copy from rocketmq-connect-runtime/src/main/resources/connect.conf
copy to rocketmq-connect-runtime/src/main/resources/connect-distributed.conf
diff --git a/rocketmq-connect-runtime/src/main/resources/connect.conf
b/rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
similarity index 95%
rename from rocketmq-connect-runtime/src/main/resources/connect.conf
rename to rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
index 449728d..1b476b2 100644
--- a/rocketmq-connect-runtime/src/main/resources/connect.conf
+++ b/rocketmq-connect-runtime/src/main/resources/connect-standalone.conf
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-workerId=DEFAULT_WORKER_1
-storePathRootDir=/xxx/storeRoot
+workerId=standalone-worker
+storePathRootDir=/tmp/storeRoot
## Http port for user to access REST API
httpPort=8082
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index a9c51ed..3a04098 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -30,7 +30,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -79,7 +79,7 @@ public class WorkerTest {
private ConnectorContext connectorContext;
@Mock
- private ConnectController connectController;
+ private DistributedConnectController connectController;
@Mock
private ConnectStatsManager connectStatsManager;
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
index 9cf5c09..10884df 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestPositionManageServiceImpl.java
@@ -20,6 +20,7 @@ package
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import java.util.List;
import java.util.Map;
@@ -50,23 +51,23 @@ public class TestPositionManageServiceImpl implements
PositionManagementService
}
- @Override public Map<RecordPartition, RecordOffset> getPositionTable() {
+ @Override public Map<ExtendRecordPartition, RecordOffset>
getPositionTable() {
return null;
}
- @Override public RecordOffset getPosition(RecordPartition partition) {
+ @Override public RecordOffset getPosition(ExtendRecordPartition partition)
{
return null;
}
- @Override public void putPosition(Map<RecordPartition, RecordOffset>
positions) {
+ @Override public void putPosition(Map<ExtendRecordPartition, RecordOffset>
positions) {
}
- @Override public void putPosition(RecordPartition partition, RecordOffset
position) {
+ @Override public void putPosition(ExtendRecordPartition partition,
RecordOffset position) {
}
- @Override public void removePosition(List<RecordPartition> partitions) {
+ @Override public void removePosition(List<ExtendRecordPartition>
partitions) {
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index c3f5e56..b452e6c 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -40,7 +40,7 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.connect.runtime.ConnectController;
+import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -71,7 +71,7 @@ import static org.mockito.Mockito.when;
public class RestHandlerTest {
@Mock
- private ConnectController connectController;
+ private DistributedConnectController connectController;
@Mock
private ConfigManagementService configManagementService;
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index cbf9366..6aa55b8 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.connect.runtime.service;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.Future;
import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.producer.SendResult;
import java.io.File;
import java.lang.reflect.Field;
@@ -37,6 +36,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
@@ -74,16 +74,17 @@ public class PositionManagementServiceImplTest {
private PositionManagementServiceImpl positionManagementService;
- private Set<RecordPartition> needSyncPartition;
+ private Set<ExtendRecordPartition> needSyncPartition;
- private KeyValueStore<RecordPartition, RecordOffset> positionStore;
+ private KeyValueStore<ExtendRecordPartition, RecordOffset> positionStore;
- private RecordPartition sourcePartition;
+ private ExtendRecordPartition sourcePartition;
private RecordOffset sourcePosition;
- private Map<RecordPartition, RecordOffset> positions;
+ private Map<ExtendRecordPartition, RecordOffset> positions;
+ private final String namespace = "namespace";
@Before
public void init() throws Exception {
connectConfig = new ConnectConfig();
@@ -132,18 +133,18 @@ public class PositionManagementServiceImplTest {
Field positionStoreField =
PositionManagementServiceImpl.class.getDeclaredField("positionStore");
positionStoreField.setAccessible(true);
- positionStore = (KeyValueStore<RecordPartition, RecordOffset>)
positionStoreField.get(positionManagementService);
+ positionStore = (KeyValueStore<ExtendRecordPartition, RecordOffset>)
positionStoreField.get(positionManagementService);
Field needSyncPartitionField =
PositionManagementServiceImpl.class.getDeclaredField("needSyncPartition");
needSyncPartitionField.setAccessible(true);
- needSyncPartition = (ConcurrentSet<RecordPartition>)
needSyncPartitionField.get(positionManagementService);
+ needSyncPartition = (ConcurrentSet<ExtendRecordPartition>)
needSyncPartitionField.get(positionManagementService);
Map<String, String> map = Maps.newHashMap("ip_port", "127.0.0.13306");
- sourcePartition = new RecordPartition(map);
+ sourcePartition = new ExtendRecordPartition(namespace,map);
Map<String, String> map1 = Maps.newHashMap("binlog_file",
"binlogFilename");
map1.put("next_position", "100");
sourcePosition = new RecordOffset(map1);
- positions = new HashMap<RecordPartition, RecordOffset>() {
+ positions = new HashMap<ExtendRecordPartition, RecordOffset>() {
{
put(sourcePartition, sourcePosition);
}
@@ -158,7 +159,7 @@ public class PositionManagementServiceImplTest {
@Test
public void testGetPositionTable() {
- Map<RecordPartition, RecordOffset> positionTable =
positionManagementService.getPositionTable();
+ Map<ExtendRecordPartition, RecordOffset> positionTable =
positionManagementService.getPositionTable();
RecordOffset bytes = positionTable.get(sourcePartition);
assertNull(bytes);
@@ -190,7 +191,7 @@ public class PositionManagementServiceImplTest {
assertNotNull(bytes);
- List<RecordPartition> sourcePartitions = new
ArrayList<RecordPartition>(8) {
+ List<ExtendRecordPartition> sourcePartitions = new
ArrayList<ExtendRecordPartition>(8) {
{
add(sourcePartition);
}
@@ -209,7 +210,7 @@ public class PositionManagementServiceImplTest {
assertTrue(needSyncPartition.contains(sourcePartition));
- List<RecordPartition> sourcePartitions = new
ArrayList<RecordPartition>(8) {
+ List<ExtendRecordPartition> sourcePartitions = new
ArrayList<ExtendRecordPartition>(8) {
{
add(sourcePartition);
}
@@ -232,15 +233,15 @@ public class PositionManagementServiceImplTest {
positionManagementService.putPosition(positions);
Map<String, String> map = Maps.newHashMap("ip_port", "127.0.0.2:3306");
- RecordPartition sourcePartitionTmp = new RecordPartition(map);
+ ExtendRecordPartition sourcePartitionTmp = new
ExtendRecordPartition(namespace,map);
Map<String, String> map1 = Maps.newHashMap("binlog_file",
"binlogFilename");
map1.put("next_position", "100");
RecordOffset sourcePositionTmp = new RecordOffset(map1);
positionStore.put(sourcePartitionTmp, sourcePositionTmp);
- Set<RecordPartition> needSyncPartitionTmp = needSyncPartition;
+ Set<ExtendRecordPartition> needSyncPartitionTmp = needSyncPartition;
needSyncPartition = new ConcurrentSet<>();
- Map<RecordPartition, RecordOffset> needSyncPosition =
positionStore.getKVMap().entrySet().stream()
+ Map<ExtendRecordPartition, RecordOffset> needSyncPosition =
positionStore.getKVMap().entrySet().stream()
.filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
@@ -252,7 +253,7 @@ public class PositionManagementServiceImplTest {
RecordOffset tmpBytes = needSyncPosition.get(sourcePartitionTmp);
assertNull(tmpBytes);
- List<RecordPartition> sourcePartitions = new
ArrayList<RecordPartition>(8) {
+ List<ExtendRecordPartition> sourcePartitions = new
ArrayList<ExtendRecordPartition>(8) {
{
add(sourcePartition);
add(sourcePartitionTmp);