This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf982f4995e [improve] configure whether function consumer should skip 
to latest (#17214)
bf982f4995e is described below

commit bf982f4995e624659021191982f7fedc13fc3ba0
Author: Neng Lu <[email protected]>
AuthorDate: Thu Feb 23 18:44:09 2023 -0800

    [improve] configure whether function consumer should skip to latest (#17214)
---
 .../java/org/apache/pulsar/common/functions/FunctionConfig.java    | 2 ++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java    | 7 +++++++
 .../org/apache/pulsar/functions/instance/JavaInstanceRunnable.java | 5 +++++
 .../apache/pulsar/functions/source/MultiConsumerPulsarSource.java  | 6 ++++++
 .../org/apache/pulsar/functions/source/PulsarSourceConfig.java     | 3 ++-
 .../apache/pulsar/functions/source/SingleConsumerPulsarSource.java | 6 ++++++
 pulsar-functions/proto/src/main/proto/Function.proto               | 1 +
 .../org/apache/pulsar/functions/utils/FunctionConfigUtils.java     | 6 ++++++
 8 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 0b26e7e93b5..e304f25d5d3 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -131,6 +131,8 @@ public class FunctionConfig {
     private Integer maxPendingAsyncRequests;
     // Whether the pulsar admin client exposed to function context, default is 
disabled.
     private Boolean exposePulsarAdminClientEnabled;
+    // Whether the consumer should skip to latest position in case of failure 
recovery
+    private Boolean skipToLatest;
 
     @Builder.Default
     private SubscriptionInitialPosition subscriptionPosition = 
SubscriptionInitialPosition.Latest;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index bc2585bc67b..05bab9c6f19 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -325,6 +325,9 @@ public class CmdFunctions extends CmdBase {
         @Parameter(names = "--subs-position", description = "Pulsar source 
subscription position if user wants to "
                 + "consume messages from the specified location #Java")
         protected SubscriptionInitialPosition subsPosition;
+        @Parameter(names = "--skip-to-latest", description = "Whether or not 
the consumer skip to latest message "
+            + "upon function instance restart", arity = 1)
+        protected Boolean skipToLatest;
         @Parameter(names = "--parallelism", description = "The parallelism 
factor of a Pulsar Function "
                 + "(i.e. the number of function instances to run) #Java")
         protected Integer parallelism;
@@ -548,6 +551,10 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setSubscriptionPosition(subsPosition);
             }
 
+            if (null != skipToLatest) {
+                functionConfig.setSkipToLatest(skipToLatest);
+            }
+
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, Object>>() {}.getType();
                 Map<String, Object> userConfigMap = new 
Gson().fromJson(userConfigString, type);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 5f82e934700..0dbfa0945ca 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -789,7 +789,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                     
convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition())
             );
 
+            pulsarSourceConfig.setSkipToLatest(
+                sourceSpec.getSkipToLatest()
+            );
+
             Objects.requireNonNull(contextImpl.getSubscriptionType());
+
             
pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
 
             pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
index 61f5bfacb35..533e8d42c11 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
@@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -66,6 +67,11 @@ public class MultiConsumerPulsarSource<T> extends 
PushPulsarSource<T> implements
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
+
+            if (pulsarSourceConfig.getSkipToLatest() != null && 
pulsarSourceConfig.getSkipToLatest()) {
+                consumer.seek(MessageId.latest);
+            }
+
             inputConsumers.add(consumer);
         }
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 5315702d7c0..ad6ea5a877f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -30,7 +30,8 @@ public abstract class PulsarSourceConfig {
     SubscriptionType subscriptionType;
     private String subscriptionName;
     private SubscriptionInitialPosition subscriptionPosition;
-    // Whether the subscriptions the functions created/used should be deleted 
when the functions is deleted
+    // Whether call consumer.seek(latest) to skip contents between last ask 
message and the latest message
+    private Boolean skipToLatest;
     private Integer maxMessageRetries = -1;
     private String deadLetterTopic;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
index 3e60111ddbe..426723804ca 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
@@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Record;
@@ -73,6 +74,11 @@ public class SingleConsumerPulsarSource<T> extends 
PulsarSource<T> {
 
         ConsumerBuilder<T> cb = createConsumeBuilder(topic, 
pulsarSourceConsumerConfig);
         consumer = cb.subscribeAsync().join();
+
+        if (this.pulsarSourceConfig.getSkipToLatest() != null && 
this.pulsarSourceConfig.getSkipToLatest()) {
+            consumer.seek(MessageId.latest);
+        }
+
         inputConsumers.add(consumer);
     }
 
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto 
b/pulsar-functions/proto/src/main/proto/Function.proto
index e67899abd22..101d45bc59c 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -165,6 +165,7 @@ message SourceSpec {
     bool cleanupSubscription = 11;
     SubscriptionPosition subscriptionPosition = 12;
     uint64 negativeAckRedeliveryDelayMs = 13;
+    bool skipToLatest = 14;
 }
 
 message SinkSpec {
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 647210de33a..d02fe5f788b 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -198,6 +198,12 @@ public class FunctionConfigUtils {
             sourceSpecBuilder.setSubscriptionPosition(subPosition);
         }
 
+        if (functionConfig.getSkipToLatest() != null) {
+            
sourceSpecBuilder.setSkipToLatest(functionConfig.getSkipToLatest());
+        } else {
+            sourceSpecBuilder.setSkipToLatest(false);
+        }
+
         if (extractedDetails.getTypeArg0() != null) {
             sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
         } else if 
(StringUtils.isNotEmpty(functionConfig.getInputTypeClassName())) {

Reply via email to