This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 0a1a033 [ISSUE #105] Fix sink retryTimes does not work (#106)
0a1a033 is described below
commit 0a1a03367ca422f3f4d5011b368e8e0ab0f91db8
Author: Humkum <[email protected]>
AuthorDate: Wed Jan 10 17:31:20 2024 +0800
[ISSUE #105] Fix sink retryTimes does not work (#106)
---
.../rocketmq/sink/table/RocketMQDynamicTableSink.java | 1 +
.../rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java | 11 ++++++++++-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
index 73e8af7..98a15e6 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSink.java
@@ -239,6 +239,7 @@ public class RocketMQDynamicTableSink implements
DynamicTableSink, SupportsWriti
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP,
producerGroup);
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
nameServerAddress);
+ producerProps.setProperty(RocketMQConfig.PRODUCER_RETRY_TIMES,
String.valueOf(retryTimes));
if (accessKey != null && secretKey != null) {
producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
index 03a89b1..9c1ca11 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/sink/table/RocketMQDynamicTableSinkFactory.java
@@ -64,6 +64,8 @@ public class RocketMQDynamicTableSinkFactory implements
DynamicTableSinkFactory
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(TOPIC);
requiredOptions.add(PRODUCER_GROUP);
+ requiredOptions.add(ENDPOINTS);
+
// requiredOptions.add(PERSIST_OFFSET_INTERVAL);
return requiredOptions;
}
@@ -76,6 +78,13 @@ public class RocketMQDynamicTableSinkFactory implements
DynamicTableSinkFactory
optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
optionalOptions.add(OPTIONAL_ACCESS_KEY);
optionalOptions.add(OPTIONAL_SECRET_KEY);
+ optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
+ optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES);
+ optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS);
+ optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
+ optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
+ optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY);
+ optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
return optionalOptions;
}
@@ -122,8 +131,8 @@ public class RocketMQDynamicTableSinkFactory implements
DynamicTableSinkFactory
dynamicColumn,
fieldDelimiter,
encoding,
- sleepTimeMs,
retryTimes,
+ sleepTimeMs,
isDynamicTag,
isDynamicTagIncluded,
writeKeysToBody,