This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e14175  add auto ack and timeout configurable (#2503)
8e14175 is described below

commit 8e141752cbbbd038ecc183fd63b72ef19dfb32cc
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Sep 18 21:19:05 2018 -0700

    add auto ack and timeout configurable (#2503)
    
    * add auto ack and timeout configurable
    
    * Fix test
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java    | 19 ++++++++++++++++++-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java     |  1 +
 .../org/apache/pulsar/functions/utils/SinkConfig.java |  3 +++
 site2/docs/reference-pulsar-admin.md                  |  4 ++++
 4 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 38a55bf..8f9eefe 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -291,6 +291,10 @@ public class CmdSinks extends CmdBase {
         protected String DEPRECATED_sinkConfigString;
         @Parameter(names = "--sink-config", description = "User defined 
configs key/values")
         protected String sinkConfigString;
+        @Parameter(names = "--auto-ack", description = "Whether or not the 
framework will automatically acknowleges messages", arity = 1)
+        protected boolean autoAck = true;
+        @Parameter(names = "--timeout-ms", description = "The message timeout 
in milliseconds")
+        protected Long timeoutMs;
 
         protected SinkConfig sinkConfig;
 
@@ -399,6 +403,15 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setConfigs(parseConfigs(sinkConfigString));
             }
 
+            sinkConfig.setAutoAck(autoAck);
+            if (timeoutMs != null) {
+                sinkConfig.setTimeoutMs(timeoutMs);
+            }
+            
+            if (null != sinkConfigString) {
+                sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+            }
+            
             inferMissingArguments(sinkConfig);
         }
 
@@ -585,7 +598,11 @@ public class CmdSinks extends CmdBase {
                             : SubscriptionType.SHARED;
             sourceSpecBuilder.setSubscriptionType(subType);
 
-            functionDetailsBuilder.setAutoAck(true);
+            functionDetailsBuilder.setAutoAck(sinkConfig.isAutoAck());
+            if (sinkConfig.getTimeoutMs() != null) {
+                sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
+            }
+            
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
             // set up sink spec
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index e3ba70e..fb13d39 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -133,6 +133,7 @@ public class TestCmdSinks {
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
         sinkConfig.setName(NAME);
+        sinkConfig.setAutoAck(true);
 
         sinkConfig.setInputs(INPUTS_LIST);
         sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
index be886c4..1132fa6 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfig.java
@@ -78,6 +78,9 @@ public class SinkConfig {
     private boolean retainOrdering;
     @isValidResources
     private Resources resources;
+    private boolean autoAck;
+    @isPositiveNumber
+    private Long timeoutMs;
 
     @isFileExists
     private String archive;
diff --git a/site2/docs/reference-pulsar-admin.md 
b/site2/docs/reference-pulsar-admin.md
index 5b1fb41..42f6c9c 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1014,6 +1014,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `update`
@@ -1091,6 +1093,8 @@ Options
 |`--sink-type`|The built-in sinks's connector provider||
 |`--topics-pattern`|TopicsPattern to consume from list of topics under a 
namespace that match the pattern.||
 |`--tenant`|The sink’s tenant||
+|`--auto-ack`|Let the functions framework manage acking||
+|`--timeout-ms`|The message timeout in milliseconds||
 
 
 ### `available-sinks`

Reply via email to