This is an automated email from the ASF dual-hosted git repository.
sunxiaojian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 3bfdef86 [ISSUES #530] fix order message bug (#532)
3bfdef86 is described below
commit 3bfdef8661d9acc420bb7665501651c395461e92
Author: onceicy <[email protected]>
AuthorDate: Thu May 23 23:22:49 2024 +0800
[ISSUES #530] fix order message bug (#532)
* [ISSUES #530] fix order message bug
* [ISSUES #530] add order message compatible with rocketmq 4.x config
* [ISSUES #530] add order message compatible with rocketmq 4.x config user
guide
* [ISSUES #530] Change the configuration item from
'ordering.msg.compatible.v4' to 'ordering.msg.enable'
---------
Co-authored-by: 靖愉 <[email protected]>
---
README.md | 7 ++++---
.../connect/runtime/config/SourceConnectorConfig.java | 1 +
.../connect/runtime/connectorwrapper/WorkerSourceTask.java | 11 ++++++++++-
.../runtime/service/AbstractConfigManagementService.java | 6 ++++++
4 files changed, 21 insertions(+), 4 deletions(-)
diff --git a/README.md b/README.md
index 37ae98ce..49a87464 100644
--- a/README.md
+++ b/README.md
@@ -344,9 +344,10 @@ public interface Transform<R extends ConnectRecord>
extends Component {
### Source Connector特殊配置
-| key | nullable | default | description
|
-|-----------------------------|---------|---------|------------------------------------------------------|
-| connect.topicname | true | |
指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 |
+| key | nullable | default | description
|
+|---------------------|---------|---------|--------------------------------------------------------------|
+| connect.topicname | true | |
指定数据写入的topic,若不配置则直接取position中key为topic的值,若取不到则抛出异常 |
+| ordering.msg.enable | true | false | 当目标集群不为rocketmq
5.x时,顺序消息会乱序,若设置为true,才能支持顺序,但会降低connector性能 |
### Sink Connector特殊配置
| key | nullable | default
| description |
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
index 12012917..f05abf3c 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SourceConnectorConfig.java
@@ -26,6 +26,7 @@ import
org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
public class SourceConnectorConfig extends ConnectorConfig {
public static final String CONNECT_TOPICNAME = "connect.topicname";
+ public static final String ORDERING_MSG_ENABLE = "ordering.msg.enable";
public SourceConnectorConfig(ConnectKeyValue config) {
super(config);
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 79317bdb..1ac8f928 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -276,7 +276,16 @@ public class WorkerSourceTask extends WorkerTask {
} else {
// Partition message ordering,
// At the same time, ensure that the data is pulled in an
orderly manner, which needs to be guaranteed by sourceTask in the business
- producer.send(sourceMessage, new
SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+ if
(taskConfig.getProperties().get(SourceConnectorConfig.ORDERING_MSG_ENABLE).equals("true"))
{
+ try {
+ SendResult result = producer.send(sourceMessage,
new SelectMessageQueueByHash(), sourceMessage.getKeys());
+ callback.onSuccess(result);
+ } catch (Exception e) {
+ callback.onException(e);
+ }
+ } else {
+ producer.send(sourceMessage, new
SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
+ }
}
} catch (RetriableException e) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index 855c0824..2c51c4ff 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -340,6 +340,12 @@ public abstract class AbstractConfigManagementService
implements ConfigManagemen
if (configs.containsKey(SourceConnectorConfig.CONNECT_TOPICNAME)) {
newKeyValue.put(SourceConnectorConfig.CONNECT_TOPICNAME,
configs.getString(SourceConnectorConfig.CONNECT_TOPICNAME));
}
+ // put ordering msg enable config
+ if
(configs.containsKey(SourceConnectorConfig.ORDERING_MSG_ENABLE)) {
+ newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE,
configs.getString(SourceConnectorConfig.ORDERING_MSG_ENABLE));
+ } else {
+ newKeyValue.put(SourceConnectorConfig.ORDERING_MSG_ENABLE,
"false");
+ }
// sink consume topic
if (configs.containsKey(SinkConnectorConfig.CONNECT_TOPICNAMES)) {
newKeyValue.put(SinkConnectorConfig.CONNECT_TOPICNAMES,
configs.getString(SinkConnectorConfig.CONNECT_TOPICNAMES));