This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new fe893f1 [ISSUE #189]Upgrade rocketmq-replicator connector API to
v0.1.3 (#194)
fe893f1 is described below
commit fe893f1ca66c0d8fad1e960ee953c104a4025342
Author: xiaoyi <[email protected]>
AuthorDate: Thu Jul 14 13:48:18 2022 +0800
[ISSUE #189]Upgrade rocketmq-replicator connector API to v0.1.3 (#194)
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
---
connectors/rocketmq-replicator/pom.xml | 2 +-
.../apache/rocketmq/replicator/MetaSourceTask.java | 23 +--------------
.../rocketmq/replicator/RmqMetaReplicator.java | 15 ++--------
.../rocketmq/replicator/RmqSourceReplicator.java | 33 ++++++----------------
.../apache/rocketmq/replicator/RmqSourceTask.java | 24 ++--------------
5 files changed, 15 insertions(+), 82 deletions(-)
diff --git a/connectors/rocketmq-replicator/pom.xml
b/connectors/rocketmq-replicator/pom.xml
index 8d76224..95a72f3 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -68,7 +68,7 @@
<properties>
<rocketmq.version>4.7.0</rocketmq.version>
-
<openmessaging.connector.version>0.1.2</openmessaging.connector.version>
+
<openmessaging.connector.version>0.1.3</openmessaging.connector.version>
<junit.version>4.13.1</junit.version>
<mockito.version>3.2.4</mockito.version>
<assertj.version>2.6.0</assertj.version>
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 3f4f2da..831d9e0 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -62,19 +62,8 @@ public class MetaSourceTask extends SourceTask {
}
@Override
- public void validate(KeyValue config) {
-
- }
-
- @Override
- public void init(KeyValue config) {
+ public void start(KeyValue config) {
ConfigUtil.load(config, this.config);
- }
-
- @Override
- public void start(SourceTaskContext sourceTaskContext) {
- super.start(sourceTaskContext);
-
try {
this.srcMQAdminExt = Utils.startMQAdminTool(this.config);
} catch (MQClientException e) {
@@ -94,16 +83,6 @@ public class MetaSourceTask extends SourceTask {
srcMQAdminExt.shutdown();
}
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
-
@Override public List<ConnectRecord> poll() {
log.debug("polling...");
List<String> groups =
JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index e401b28..d3e3232 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -107,8 +107,9 @@ public class RmqMetaReplicator extends SourceConnector {
}
}
+
@Override
- public void init(KeyValue config) {
+ public void start(KeyValue config) {
try {
replicatorConfig.init(config);
} catch (IllegalArgumentException e) {
@@ -117,11 +118,7 @@ public class RmqMetaReplicator extends SourceConnector {
}
this.configValid = true;
this.prepare();
- }
- @Override
- public void start(ConnectorContext componentContext) {
- super.start(componentContext);
log.info("starting...");
executor.scheduleAtFixedRate(this::refreshConsumerGroups,
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(),
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(this::syncSubConfig,
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(),
TimeUnit.SECONDS);
@@ -134,14 +131,6 @@ public class RmqMetaReplicator extends SourceConnector {
this.targetMQAdminExt.shutdown();
}
- @Override public void pause() {
-
- }
-
- @Override public void resume() {
-
- }
-
@Override public Class<? extends Task> taskClass() {
return MetaSourceTask.class;
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e4966d1..e2554de 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -101,8 +101,15 @@ public class RmqSourceReplicator extends SourceConnector {
}
@Override
- public void start(ConnectorContext componentContext) {
- super.start(componentContext);
+ public void start(KeyValue config) {
+ try {
+ this.replicatorConfig.init(config);
+ } catch (IllegalArgumentException e) {
+ log.error("RmqSourceReplicator init config error.", e);
+ throw new IllegalArgumentException("RmqSourceReplicator init
config error.");
+ }
+ this.configValid = true;
+
try {
startMQAdminTools();
} catch (MQClientException e) {
@@ -157,17 +164,6 @@ public class RmqSourceReplicator extends SourceConnector {
return true;
}
- @Override
- public void init(KeyValue config) {
- try {
- this.replicatorConfig.init(config);
- } catch (IllegalArgumentException e) {
- log.error("RmqSourceReplicator init config error.", e);
- throw new IllegalArgumentException("RmqSourceReplicator init
config error.");
- }
- this.configValid = true;
- }
-
@Override
public void stop() {
executor.shutdown();
@@ -175,19 +171,8 @@ public class RmqSourceReplicator extends SourceConnector {
this.targetMQAdminExt.shutdown();
}
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
-
@Override
public Class<? extends Task> taskClass() {
-
return RmqSourceTask.class;
}
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index a8fc7d5..2a5be6d 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -87,8 +87,8 @@ public class RmqSourceTask extends SourceTask {
}
@Override
- public void start(SourceTaskContext sourceTaskContext) {
- super.start(sourceTaskContext);
+ public void start(KeyValue config) {
+ ConfigUtil.load(config, this.config);
RPCHook rpcHook = null;
if (this.config.isSrcAclEnable()) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(this.config.getSrcAccessKey(),
this.config.getSrcSecretKey()));
@@ -126,16 +126,6 @@ public class RmqSourceTask extends SourceTask {
log.info("RocketMQ source task started");
}
- @Override
- public void validate(KeyValue config) {
-
- }
-
- @Override
- public void init(KeyValue config) {
- ConfigUtil.load(config, this.config);
- }
-
@Override
public void stop() {
@@ -147,16 +137,6 @@ public class RmqSourceTask extends SourceTask {
}
}
- @Override
- public void pause() {
-
- }
-
- @Override
- public void resume() {
-
- }
-
private List<ConnectRecord> pollCommonMessage() {
List<ConnectRecord> res = new ArrayList<>();