This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new d52a79d3 [ISSUE #424] Adjust the order in which WorkerSourceTask 
topics are acquired (#426)
d52a79d3 is described below

commit d52a79d3e6e119151e4c8e38a08ce75ebb6821df
Author: 楚闯 <[email protected]>
AuthorDate: Fri Feb 17 11:25:53 2023 +0800

    [ISSUE #424] Adjust the order in which WorkerSourceTask topics are acquired 
(#426)
    
    Co-authored-by: cchu <[email protected]>
---
 .../connect/runtime/connectorwrapper/WorkerSourceTask.java         | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index f379b7fb..ec94b8e3 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -420,10 +420,11 @@ public class WorkerSourceTask extends WorkerTask {
      * @return
      */
     private String maybeCreateAndGetTopic(ConnectRecord record) {
-        String topic = overwriteTopicFromRecord(record);
+        // topic from config
+        String topic = 
taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
         if (StringUtils.isBlank(topic)) {
-            // topic from config
-            topic = 
taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
+            // try topic from config
+            topic = overwriteTopicFromRecord(record);
         }
         if (StringUtils.isBlank(topic)) {
             throw new ConnectException("source connect lack of topic config");

Reply via email to