This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bf982f4995e [improve] configure whether function consumer should skip
to latest (#17214)
bf982f4995e is described below
commit bf982f4995e624659021191982f7fedc13fc3ba0
Author: Neng Lu <[email protected]>
AuthorDate: Thu Feb 23 18:44:09 2023 -0800
[improve] configure whether function consumer should skip to latest (#17214)
---
.../java/org/apache/pulsar/common/functions/FunctionConfig.java | 2 ++
.../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java | 7 +++++++
.../org/apache/pulsar/functions/instance/JavaInstanceRunnable.java | 5 +++++
.../apache/pulsar/functions/source/MultiConsumerPulsarSource.java | 6 ++++++
.../org/apache/pulsar/functions/source/PulsarSourceConfig.java | 3 ++-
.../apache/pulsar/functions/source/SingleConsumerPulsarSource.java | 6 ++++++
pulsar-functions/proto/src/main/proto/Function.proto | 1 +
.../org/apache/pulsar/functions/utils/FunctionConfigUtils.java | 6 ++++++
8 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 0b26e7e93b5..e304f25d5d3 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -131,6 +131,8 @@ public class FunctionConfig {
private Integer maxPendingAsyncRequests;
// Whether the pulsar admin client exposed to function context, default is
disabled.
private Boolean exposePulsarAdminClientEnabled;
+ // Whether the consumer should skip to latest position in case of failure
recovery
+ private Boolean skipToLatest;
@Builder.Default
private SubscriptionInitialPosition subscriptionPosition =
SubscriptionInitialPosition.Latest;
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index bc2585bc67b..05bab9c6f19 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -325,6 +325,9 @@ public class CmdFunctions extends CmdBase {
@Parameter(names = "--subs-position", description = "Pulsar source
subscription position if user wants to "
+ "consume messages from the specified location #Java")
protected SubscriptionInitialPosition subsPosition;
+ @Parameter(names = "--skip-to-latest", description = "Whether or not
the consumer skip to latest message "
+ + "upon function instance restart", arity = 1)
+ protected Boolean skipToLatest;
@Parameter(names = "--parallelism", description = "The parallelism
factor of a Pulsar Function "
+ "(i.e. the number of function instances to run) #Java")
protected Integer parallelism;
@@ -548,6 +551,10 @@ public class CmdFunctions extends CmdBase {
functionConfig.setSubscriptionPosition(subsPosition);
}
+ if (null != skipToLatest) {
+ functionConfig.setSkipToLatest(skipToLatest);
+ }
+
if (null != userConfigString) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> userConfigMap = new
Gson().fromJson(userConfigString, type);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 5f82e934700..0dbfa0945ca 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -789,7 +789,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition())
);
+ pulsarSourceConfig.setSkipToLatest(
+ sourceSpec.getSkipToLatest()
+ );
+
Objects.requireNonNull(contextImpl.getSubscriptionType());
+
pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
index 61f5bfacb35..533e8d42c11 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
@@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -66,6 +67,11 @@ public class MultiConsumerPulsarSource<T> extends
PushPulsarSource<T> implements
cb.messageListener(this);
Consumer<T> consumer = cb.subscribeAsync().join();
+
+ if (pulsarSourceConfig.getSkipToLatest() != null &&
pulsarSourceConfig.getSkipToLatest()) {
+ consumer.seek(MessageId.latest);
+ }
+
inputConsumers.add(consumer);
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 5315702d7c0..ad6ea5a877f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -30,7 +30,8 @@ public abstract class PulsarSourceConfig {
SubscriptionType subscriptionType;
private String subscriptionName;
private SubscriptionInitialPosition subscriptionPosition;
- // Whether the subscriptions the functions created/used should be deleted
when the functions is deleted
+ // Whether call consumer.seek(latest) to skip contents between last ask
message and the latest message
+ private Boolean skipToLatest;
private Integer maxMessageRetries = -1;
private String deadLetterTopic;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
index 3e60111ddbe..426723804ca 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
@@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
@@ -73,6 +74,11 @@ public class SingleConsumerPulsarSource<T> extends
PulsarSource<T> {
ConsumerBuilder<T> cb = createConsumeBuilder(topic,
pulsarSourceConsumerConfig);
consumer = cb.subscribeAsync().join();
+
+ if (this.pulsarSourceConfig.getSkipToLatest() != null &&
this.pulsarSourceConfig.getSkipToLatest()) {
+ consumer.seek(MessageId.latest);
+ }
+
inputConsumers.add(consumer);
}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index e67899abd22..101d45bc59c 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -165,6 +165,7 @@ message SourceSpec {
bool cleanupSubscription = 11;
SubscriptionPosition subscriptionPosition = 12;
uint64 negativeAckRedeliveryDelayMs = 13;
+ bool skipToLatest = 14;
}
message SinkSpec {
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 647210de33a..d02fe5f788b 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -198,6 +198,12 @@ public class FunctionConfigUtils {
sourceSpecBuilder.setSubscriptionPosition(subPosition);
}
+ if (functionConfig.getSkipToLatest() != null) {
+
sourceSpecBuilder.setSkipToLatest(functionConfig.getSkipToLatest());
+ } else {
+ sourceSpecBuilder.setSkipToLatest(false);
+ }
+
if (extractedDetails.getTypeArg0() != null) {
sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
} else if
(StringUtils.isNotEmpty(functionConfig.getInputTypeClassName())) {