This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new d52a79d3 [ISSUE #424] Adjust the order in which WorkerSourceTask
topics are acquired (#426)
d52a79d3 is described below
commit d52a79d3e6e119151e4c8e38a08ce75ebb6821df
Author: 楚闯 <[email protected]>
AuthorDate: Fri Feb 17 11:25:53 2023 +0800
[ISSUE #424] Adjust the order in which WorkerSourceTask topics are acquired
(#426)
Co-authored-by: cchu <[email protected]>
---
.../connect/runtime/connectorwrapper/WorkerSourceTask.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index f379b7fb..ec94b8e3 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -420,10 +420,11 @@ public class WorkerSourceTask extends WorkerTask {
* @return
*/
private String maybeCreateAndGetTopic(ConnectRecord record) {
- String topic = overwriteTopicFromRecord(record);
+ // topic from config
+ String topic =
taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
if (StringUtils.isBlank(topic)) {
- // topic from config
- topic =
taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
+ // try topic from config
+ topic = overwriteTopicFromRecord(record);
}
if (StringUtils.isBlank(topic)) {
throw new ConnectException("source connect lack of topic config");