This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7ecb93cd3b9 [fix][io] Fix --retain[-key]-ordering not working error
for sink (#21060)
7ecb93cd3b9 is described below
commit 7ecb93cd3b94660412bbb968c0976bb0e0d75e71
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Aug 31 16:19:58 2023 +0800
[fix][io] Fix --retain[-key]-ordering not working error for sink (#21060)
---
.../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java | 7 +++++++
.../org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 2 ++
2 files changed, 9 insertions(+)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index c26df3b3f14..7919d697126 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -210,6 +210,13 @@ public class SinkConfigUtils {
functionDetailsBuilder.setSource(sourceSpecBuilder);
+ if (sinkConfig.getRetainKeyOrdering() != null) {
+
functionDetailsBuilder.setRetainKeyOrdering(sinkConfig.getRetainKeyOrdering());
+ }
+ if (sinkConfig.getRetainOrdering() != null) {
+
functionDetailsBuilder.setRetainOrdering(sinkConfig.getRetainOrdering());
+ }
+
if (sinkConfig.getMaxMessageRetries() != null &&
sinkConfig.getMaxMessageRetries() > 0) {
Function.RetryDetails.Builder retryDetails =
Function.RetryDetails.newBuilder();
retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 62d6f68d31b..8ac9b61e3f6 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -151,6 +151,7 @@ public class SinkConfigUtilsTest {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setRetainOrdering(testcase);
Function.FunctionDetails functionDetails =
SinkConfigUtils.convert(sinkConfig, new
SinkConfigUtils.ExtractedSinkDetails(null, null, null));
+ assertEquals(functionDetails.getRetainOrdering(), testcase != null
? testcase : Boolean.valueOf(false));
SinkConfig result =
SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getRetainOrdering(), testcase != null ?
testcase : Boolean.valueOf(false));
}
@@ -163,6 +164,7 @@ public class SinkConfigUtilsTest {
SinkConfig sinkConfig = createSinkConfig();
sinkConfig.setRetainKeyOrdering(testcase);
Function.FunctionDetails functionDetails =
SinkConfigUtils.convert(sinkConfig, new
SinkConfigUtils.ExtractedSinkDetails(null, null, null));
+ assertEquals(functionDetails.getRetainKeyOrdering(), testcase !=
null ? testcase : Boolean.valueOf(false));
SinkConfig result =
SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(result.getRetainKeyOrdering(), testcase != null ?
testcase : Boolean.valueOf(false));
}