This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new fe893f1  [ISSUE #189]Upgrade rocketmq-replicator connector API to 
v0.1.3 (#194)
fe893f1 is described below

commit fe893f1ca66c0d8fad1e960ee953c104a4025342
Author: xiaoyi <[email protected]>
AuthorDate: Thu Jul 14 13:48:18 2022 +0800

    [ISSUE #189]Upgrade rocketmq-replicator connector API to v0.1.3 (#194)
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
---
 connectors/rocketmq-replicator/pom.xml             |  2 +-
 .../apache/rocketmq/replicator/MetaSourceTask.java | 23 +--------------
 .../rocketmq/replicator/RmqMetaReplicator.java     | 15 ++--------
 .../rocketmq/replicator/RmqSourceReplicator.java   | 33 ++++++----------------
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 24 ++--------------
 5 files changed, 15 insertions(+), 82 deletions(-)

diff --git a/connectors/rocketmq-replicator/pom.xml 
b/connectors/rocketmq-replicator/pom.xml
index 8d76224..95a72f3 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -68,7 +68,7 @@
 
     <properties>
         <rocketmq.version>4.7.0</rocketmq.version>
-        
<openmessaging.connector.version>0.1.2</openmessaging.connector.version>
+        
<openmessaging.connector.version>0.1.3</openmessaging.connector.version>
         <junit.version>4.13.1</junit.version>
         <mockito.version>3.2.4</mockito.version>
         <assertj.version>2.6.0</assertj.version>
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 3f4f2da..831d9e0 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -62,19 +62,8 @@ public class MetaSourceTask extends SourceTask {
     }
 
     @Override
-    public void validate(KeyValue config) {
-
-    }
-
-    @Override
-    public void init(KeyValue config) {
+    public void start(KeyValue config) {
         ConfigUtil.load(config, this.config);
-    }
-
-    @Override
-    public void start(SourceTaskContext sourceTaskContext) {
-        super.start(sourceTaskContext);
-
         try {
             this.srcMQAdminExt = Utils.startMQAdminTool(this.config);
         } catch (MQClientException e) {
@@ -94,16 +83,6 @@ public class MetaSourceTask extends SourceTask {
         srcMQAdminExt.shutdown();
     }
 
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
-
     @Override public List<ConnectRecord> poll() {
         log.debug("polling...");
         List<String> groups = 
JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index e401b28..d3e3232 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -107,8 +107,9 @@ public class RmqMetaReplicator extends SourceConnector {
         }
     }
 
+
     @Override
-    public void init(KeyValue config) {
+    public void start(KeyValue config) {
         try {
             replicatorConfig.init(config);
         } catch (IllegalArgumentException e) {
@@ -117,11 +118,7 @@ public class RmqMetaReplicator extends SourceConnector {
         }
         this.configValid = true;
         this.prepare();
-    }
 
-    @Override
-    public void start(ConnectorContext componentContext) {
-        super.start(componentContext);
         log.info("starting...");
         executor.scheduleAtFixedRate(this::refreshConsumerGroups, 
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), 
TimeUnit.SECONDS);
         executor.scheduleAtFixedRate(this::syncSubConfig, 
replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), 
TimeUnit.SECONDS);
@@ -134,14 +131,6 @@ public class RmqMetaReplicator extends SourceConnector {
         this.targetMQAdminExt.shutdown();
     }
 
-    @Override public void pause() {
-
-    }
-
-    @Override public void resume() {
-
-    }
-
     @Override public Class<? extends Task> taskClass() {
         return MetaSourceTask.class;
     }
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index e4966d1..e2554de 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -101,8 +101,15 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     @Override
-    public void start(ConnectorContext componentContext) {
-        super.start(componentContext);
+    public void start(KeyValue config) {
+        try {
+            this.replicatorConfig.init(config);
+        } catch (IllegalArgumentException e) {
+            log.error("RmqSourceReplicator init config error.", e);
+            throw new IllegalArgumentException("RmqSourceReplicator init 
config error.");
+        }
+        this.configValid = true;
+
         try {
             startMQAdminTools();
         } catch (MQClientException e) {
@@ -157,17 +164,6 @@ public class RmqSourceReplicator extends SourceConnector {
         return true;
     }
 
-    @Override
-    public void init(KeyValue config) {
-        try {
-            this.replicatorConfig.init(config);
-        } catch (IllegalArgumentException e) {
-            log.error("RmqSourceReplicator init config error.", e);
-            throw new IllegalArgumentException("RmqSourceReplicator init 
config error.");
-        }
-        this.configValid = true;
-    }
-
     @Override
     public void stop() {
         executor.shutdown();
@@ -175,19 +171,8 @@ public class RmqSourceReplicator extends SourceConnector {
         this.targetMQAdminExt.shutdown();
     }
 
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
-
     @Override
     public Class<? extends Task> taskClass() {
-
         return RmqSourceTask.class;
     }
 
diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index a8fc7d5..2a5be6d 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -87,8 +87,8 @@ public class RmqSourceTask extends SourceTask {
     }
 
     @Override
-    public void start(SourceTaskContext sourceTaskContext) {
-        super.start(sourceTaskContext);
+    public void start(KeyValue config) {
+        ConfigUtil.load(config, this.config);
         RPCHook rpcHook = null;
         if (this.config.isSrcAclEnable()) {
             rpcHook = new AclClientRPCHook(new 
SessionCredentials(this.config.getSrcAccessKey(), 
this.config.getSrcSecretKey()));
@@ -126,16 +126,6 @@ public class RmqSourceTask extends SourceTask {
         log.info("RocketMQ source task started");
     }
 
-    @Override
-    public void validate(KeyValue config) {
-
-    }
-
-    @Override
-    public void init(KeyValue config) {
-        ConfigUtil.load(config, this.config);
-    }
-
     @Override
     public void stop() {
 
@@ -147,16 +137,6 @@ public class RmqSourceTask extends SourceTask {
         }
     }
 
-    @Override
-    public void pause() {
-
-    }
-
-    @Override
-    public void resume() {
-
-    }
-
     private List<ConnectRecord> pollCommonMessage() {
 
         List<ConnectRecord> res = new ArrayList<>();

Reply via email to