This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ecb93cd3b9 [fix][io] Fix --retain[-key]-ordering not working error 
for sink (#21060)
7ecb93cd3b9 is described below

commit 7ecb93cd3b94660412bbb968c0976bb0e0d75e71
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Aug 31 16:19:58 2023 +0800

    [fix][io] Fix --retain[-key]-ordering not working error for sink (#21060)
---
 .../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java    | 7 +++++++
 .../org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java     | 2 ++
 2 files changed, 9 insertions(+)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index c26df3b3f14..7919d697126 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -210,6 +210,13 @@ public class SinkConfigUtils {
 
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
+        if (sinkConfig.getRetainKeyOrdering() != null) {
+            
functionDetailsBuilder.setRetainKeyOrdering(sinkConfig.getRetainKeyOrdering());
+        }
+        if (sinkConfig.getRetainOrdering() != null) {
+            
functionDetailsBuilder.setRetainOrdering(sinkConfig.getRetainOrdering());
+        }
+
         if (sinkConfig.getMaxMessageRetries() != null && 
sinkConfig.getMaxMessageRetries() > 0) {
             Function.RetryDetails.Builder retryDetails = 
Function.RetryDetails.newBuilder();
             
retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 62d6f68d31b..8ac9b61e3f6 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -151,6 +151,7 @@ public class SinkConfigUtilsTest {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setRetainOrdering(testcase);
             Function.FunctionDetails functionDetails = 
SinkConfigUtils.convert(sinkConfig, new 
SinkConfigUtils.ExtractedSinkDetails(null, null, null));
+            assertEquals(functionDetails.getRetainOrdering(), testcase != null 
? testcase : Boolean.valueOf(false));
             SinkConfig result = 
SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getRetainOrdering(), testcase != null ? 
testcase : Boolean.valueOf(false));
         }
@@ -163,6 +164,7 @@ public class SinkConfigUtilsTest {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setRetainKeyOrdering(testcase);
             Function.FunctionDetails functionDetails = 
SinkConfigUtils.convert(sinkConfig, new 
SinkConfigUtils.ExtractedSinkDetails(null, null, null));
+            assertEquals(functionDetails.getRetainKeyOrdering(), testcase != 
null ? testcase : Boolean.valueOf(false));
             SinkConfig result = 
SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getRetainKeyOrdering(), testcase != null ? 
testcase : Boolean.valueOf(false));
         }

Reply via email to