This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dbddfe5282fa5c0d0ff5ec85db624672e4c9a603 Author: Rui Fu <[email protected]> AuthorDate: Wed Sep 15 08:53:56 2021 +0800 [pulsar-functions] Pass `SubscriptionPosition` from `FunctionDetails` to `FunctionConfig` / `SinkConfig` (#11831) * pass SubscriptionPosition from FunctionDetails to config * address comment * reduce code duplication * set subscriptionPosition in FunctionConfig, SinkConfig with init value * fix default values * fix CI * revert init data * fix CI * fix CI * fix CI (cherry picked from commit bfd6542ac17c24ff38134a843b3f372cad9246e8) --- .../pulsar/common/functions/FunctionConfig.java | 3 ++- .../org/apache/pulsar/common/io/SinkConfig.java | 3 ++- .../functions/instance/JavaInstanceRunnable.java | 12 ++++-------- .../pulsar/functions/utils/FunctionCommon.java | 10 ++++++++++ .../pulsar/functions/utils/FunctionConfigUtils.java | 21 ++++++++++++++------- .../pulsar/functions/utils/SinkConfigUtils.java | 6 ++++++ 6 files changed, 38 insertions(+), 17 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java index 527f9b8..0fe539f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java @@ -126,5 +126,6 @@ public class FunctionConfig { // Whether the pulsar admin client exposed to function context, default is disabled. private Boolean exposePulsarAdminClientEnabled; - private SubscriptionInitialPosition subscriptionPosition; + @Builder.Default + private SubscriptionInitialPosition subscriptionPosition = SubscriptionInitialPosition.Latest; } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java index 4a0d093..9bf171a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java @@ -43,7 +43,8 @@ public class SinkConfig { private String name; private String className; private String sourceSubscriptionName; - private SubscriptionInitialPosition sourceSubscriptionPosition; + @Builder.Default + private SubscriptionInitialPosition sourceSubscriptionPosition = SubscriptionInitialPosition.Latest; private Collection<String> inputs; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 692902e..f6ca84d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.instance; +import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; @@ -669,14 +670,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); - switch (sourceSpec.getSubscriptionPosition()) { - case EARLIEST: - pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest); - break; - default: - pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest); - break; - } + pulsarSourceConfig.setSubscriptionPosition( + convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition()) + ); Preconditions.checkNotNull(contextImpl.getSubscriptionType()); pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType()); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java index 2e433ad..a13695e 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java @@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.functions.FunctionConfig; @@ -501,4 +502,13 @@ public class FunctionCommon { return false; } + + public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition( + org.apache.pulsar.functions.proto.Function.SubscriptionPosition subscriptionPosition) { + if (org.apache.pulsar.functions.proto.Function.SubscriptionPosition.EARLIEST.equals(subscriptionPosition)) { + return SubscriptionInitialPosition.Earliest; + } else { + return SubscriptionInitialPosition.Latest; + } + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java index 13089e4..a2579b7 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java @@ -47,6 +47,7 @@ import static org.apache.commons.lang.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.pulsar.common.functions.Utils.BUILTIN; import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar; +import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition; @Slf4j public class FunctionConfigUtils { @@ -158,15 +159,16 @@ public class FunctionConfigUtils { } // Set subscription position - Function.SubscriptionPosition subPosition; - if (functionConfig.getSubscriptionPosition() == SubscriptionInitialPosition.Earliest) { - subPosition = Function.SubscriptionPosition.EARLIEST; - } else { - subPosition = Function.SubscriptionPosition.LATEST; + if (functionConfig.getSubscriptionPosition() != null) { + Function.SubscriptionPosition subPosition = null; + if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) { + subPosition = Function.SubscriptionPosition.EARLIEST; + } else { + subPosition = Function.SubscriptionPosition.LATEST; + } + sourceSpecBuilder.setSubscriptionPosition(subPosition); } - sourceSpecBuilder.setSubscriptionPosition(subPosition); - if (typeArgs != null) { sourceSpecBuilder.setTypeClassName(typeArgs[0].getName()); } @@ -367,6 +369,11 @@ public class FunctionConfigUtils { functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription()); functionConfig.setAutoAck(functionDetails.getAutoAck()); + + // Set subscription position + functionConfig.setSubscriptionPosition( + convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition())); + if (functionDetails.getSource().getTimeoutMs() != 0) { functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java index fd6f35c..a8180ab 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java @@ -55,6 +55,7 @@ import java.util.Map; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition; import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee; import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType; @@ -284,6 +285,11 @@ public class SinkConfigUtils { sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE); } sinkConfig.setAutoAck(functionDetails.getAutoAck()); + + // Set subscription position + sinkConfig.setSourceSubscriptionPosition( + convertFromFunctionDetailsSubscriptionPosition(functionDetails.getSource().getSubscriptionPosition())); + if (functionDetails.getSource().getTimeoutMs() != 0) { sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs()); }
