This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/dev-offline-sync by this push:
     new bf38d2a3aa [INLONG-10560][Sort] Support bounded pulsar source (#10569)
bf38d2a3aa is described below

commit bf38d2a3aa1f5d8663fb4797ab9bad9ca787a7ab
Author: AloysZhang <[email protected]>
AuthorDate: Fri Jul 5 16:21:03 2024 +0800

    [INLONG-10560][Sort] Support bounded pulsar source (#10569)
    
    Co-authored-by: Aloys Zhang <[email protected]>
---
 .../apache/inlong/common/bounded/Boundaries.java   | 30 ++++++++++++
 .../apache/inlong/common/bounded/BoundaryType.java | 56 ++++++++++++++++++++++
 .../inlong/sort/configuration/Constants.java       | 15 ++++++
 .../inlong/sort/protocol/node/ExtractNode.java     |  7 +++
 .../protocol/node/extract/PulsarExtractNode.java   | 29 +++++++++++
 .../main/java/org/apache/inlong/sort/Entrance.java | 42 ++++++++++++++++
 .../sort/pulsar/table/PulsarTableFactory.java      |  4 ++
 .../sort/pulsar/table/PulsarTableOptions.java      | 28 +++++++++++
 .../pulsar/table/source/PulsarTableSource.java     | 23 +++++----
 9 files changed, 225 insertions(+), 9 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java 
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java
new file mode 100644
index 0000000000..d3302ab99c
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/Boundaries.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.bounded;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Boundaries {
+
+    public String lowerBound;
+    public String upperBound;
+    public BoundaryType boundaryType;
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
new file mode 100644
index 0000000000..1888bd3d91
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/bounded/BoundaryType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.bounded;
+
+import lombok.Getter;
+
+/**
+ * Source boundary types.
+ * TIME is the common boundary type should be supported in every bounded 
source.
+ * OFFSET is the boundary for MQ type bounded source, like offset in kafka or 
messageId in pulsar.
+ * */
+@Getter
+public enum BoundaryType {
+
+    TIME("time"),
+    OFFSET("offset");
+
+    private final String type;
+
+    BoundaryType(String boundaryType) {
+        this.type = boundaryType;
+    }
+
+    public static BoundaryType getInstance(String boundaryType) {
+        for (BoundaryType type : values()) {
+            if (type.getType().equalsIgnoreCase(boundaryType)) {
+                return type;
+            }
+        }
+        return null;
+    }
+
+    public static boolean isSupportBoundaryType(String boundaryType) {
+        for (BoundaryType source : values()) {
+            if (source.getType().equalsIgnoreCase(boundaryType)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index a5717242a5..878b841076 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -299,6 +299,21 @@ public class Constants {
             .defaultValue("stream")
             .withDescription("The runtime execution mode of Flink, including 
stream and batch, default is stream");
 
+    // ------------------------------------------------------------------------
+    // Source boundary related
+    // ------------------------------------------------------------------------
+    public static final ConfigOption<String> SOURCE_BOUNDARY_TYPE = 
key("source.boundary.type")
+            .defaultValue("time")
+            .withDescription("The type of source boundary");
+
+    public static final ConfigOption<String> SOURCE_LOWER_BOUNDARY = 
key("source.lower.boundary")
+            .defaultValue("0")
+            .withDescription("The lower bound of source");
+
+    public static final ConfigOption<String> SOURCE_UPPER_BOUNDARY = 
key("source.upper.boundary")
+            .defaultValue("0")
+            .withDescription("The upper bound of source");
+
     // ------------------------------------------------------------------------
     // Metrics related
     // ------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index fc68f0f356..55e7b57aae 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.protocol.node;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
@@ -106,4 +107,10 @@ public abstract class ExtractNode implements Node {
         this.watermarkField = watermarkField;
         this.properties = properties;
     }
+
+    public void fillInBoundaries(Boundaries boundaries) {
+        Preconditions.checkNotNull(boundaries, "boundaries is null");
+        // every single kind of extract node should provide the way to fill in 
boundaries individually
+    }
+
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index 9a2adcc8e3..ab90ba7d19 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.protocol.node.extract;
 
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
 import org.apache.inlong.common.enums.MetaField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
@@ -33,13 +35,17 @@ import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 @EqualsAndHashCode(callSuper = true)
@@ -47,6 +53,7 @@ import java.util.Set;
 @Data
 public class PulsarExtractNode extends ExtractNode implements InlongMetric, 
Metadata {
 
+    private static final Logger log = 
LoggerFactory.getLogger(PulsarExtractNode.class);
     private static final long serialVersionUID = 1L;
 
     @Nonnull
@@ -89,6 +96,8 @@ public class PulsarExtractNode extends ExtractNode implements 
InlongMetric, Meta
     @JsonProperty("clientAuthParams")
     private String clientAuthParams;
 
+    Map<String, String> sourceBoundaryOptions = new HashMap<>();
+
     @JsonCreator
     public PulsarExtractNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -146,11 +155,17 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
             options.put("scan.startup.sub-name", scanStartupSubName);
             options.put("scan.startup.sub-start-offset", 
scanStartupSubStartOffset);
         }
+
         if (StringUtils.isNotBlank(clientAuthPluginClassName)
                 && StringUtils.isNotBlank(clientAuthParams)) {
             options.put("pulsar.client.authPluginClassName", 
clientAuthPluginClassName);
             options.put("pulsar.client.authParams", clientAuthParams);
         }
+
+        // add boundary options
+        if (!sourceBoundaryOptions.isEmpty()) {
+            options.putAll(sourceBoundaryOptions);
+        }
         return options;
     }
 
@@ -197,4 +212,18 @@ public class PulsarExtractNode extends ExtractNode 
implements InlongMetric, Meta
         return EnumSet.of(MetaField.AUDIT_DATA_TIME);
     }
 
+    @Override
+    public void fillInBoundaries(Boundaries boundaries) {
+        super.fillInBoundaries(boundaries);
+        BoundaryType boundaryType = boundaries.getBoundaryType();
+        String lowerBoundary = boundaries.getLowerBound();
+        String upperBoundary = boundaries.getUpperBound();
+        if (Objects.requireNonNull(boundaryType) == BoundaryType.TIME) {
+            sourceBoundaryOptions.put("source.start.publish-time", 
lowerBoundary);
+            sourceBoundaryOptions.put("source.stop.at-publish-time", 
upperBoundary);
+            log.info("Filled in source boundaries options");
+        } else {
+            log.warn("Not supported boundary type: {}", boundaryType);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index 4aa3902cae..a9744f793c 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort;
 
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.parser.Parser;
@@ -24,6 +26,7 @@ import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
 import org.apache.inlong.sort.parser.impl.NativeFlinkSqlParser;
 import org.apache.inlong.sort.parser.result.ParseResult;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.util.ParameterTool;
 
 import com.google.common.base.Preconditions;
@@ -33,13 +36,20 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
+import static 
org.apache.inlong.sort.configuration.Constants.SOURCE_BOUNDARY_TYPE;
+import static 
org.apache.inlong.sort.configuration.Constants.SOURCE_LOWER_BOUNDARY;
+import static 
org.apache.inlong.sort.configuration.Constants.SOURCE_UPPER_BOUNDARY;
+
 public class Entrance {
 
+    private static final Logger log = LoggerFactory.getLogger(Entrance.class);
     public static final String BATCH_MODE = "batch";
 
     public static void main(String[] args) throws Exception {
@@ -75,6 +85,10 @@ public class Entrance {
                 
groupInfo.getProperties().putIfAbsent(Constants.METRICS_AUDIT_PROXY_HOSTS.key(),
                         config.getString(Constants.METRICS_AUDIT_PROXY_HOSTS));
             }
+
+            // fill in boundaries if needed
+            fillInSourceBoundariesIfNeeded(runtimeExecutionMode, groupInfo, 
config);
+
             parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         } else {
             String statements = getStatementSetFromFile(sqlFile);
@@ -85,6 +99,34 @@ public class Entrance {
         parseResult.execute();
     }
 
+    private static void fillInSourceBoundariesIfNeeded(String 
runtimeExecutionMode, GroupInfo groupInfo,
+            Configuration configuration) {
+        if (!BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
+            return;
+        }
+        String type = configuration.getString(SOURCE_BOUNDARY_TYPE);
+        String lowerBoundary = configuration.getString(SOURCE_LOWER_BOUNDARY);
+        String upperBoundary = configuration.getString(SOURCE_UPPER_BOUNDARY);
+
+        log.info("Filling in source boundaries for group: {}, with execution 
mode: {}, boundaryType: {}, "
+                + "lowerBoundary: {}, upperBoundary: {}",
+                groupInfo.getGroupId(), runtimeExecutionMode, type, 
lowerBoundary, upperBoundary);
+
+        BoundaryType boundaryType = BoundaryType.getInstance(type);
+        if (boundaryType == null) {
+            throw new RuntimeException("Unknown boundary type: " + type);
+        }
+        Boundaries boundaries = new Boundaries(lowerBoundary, upperBoundary, 
boundaryType);
+        // add source boundaries for bounded source
+        groupInfo.getStreams().forEach(streamInfo -> {
+            streamInfo.getNodes().forEach(node -> {
+                if (node instanceof ExtractNode) {
+                    ((ExtractNode) node).fillInBoundaries(boundaries);
+                }
+            });
+        });
+    }
+
     private static String getStatementSetFromFile(String fileName) throws 
IOException {
         return Files.asCharSource(new File(fileName), 
StandardCharsets.UTF_8).read();
     }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
index 4b982f4691..deb240c7ed 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -81,11 +81,13 @@ import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID_DEPRECATED;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME_DEPRECATED;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
 import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.STARTUP_MODE;
 import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
@@ -271,8 +273,10 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory, DynamicTab
                 ADMIN_URL,
                 STARTUP_MODE,
                 SOURCE_SUBSCRIPTION_NAME,
+                SOURCE_SUBSCRIPTION_NAME_DEPRECATED,
                 SOURCE_SUBSCRIPTION_TYPE,
                 SOURCE_START_FROM_MESSAGE_ID,
+                SOURCE_START_FROM_MESSAGE_ID_DEPRECATED,
                 SOURCE_START_FROM_PUBLISH_TIME,
                 SOURCE_STOP_AT_MESSAGE_ID,
                 SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
index bc78dd2d98..93644fecfb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -79,6 +79,16 @@ public final class PulsarTableOptions {
      * Copied because we want to have a default value for it.
      */
     public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("source.subscription-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
+                                    .build());
+
+    public static final ConfigOption<String> 
SOURCE_SUBSCRIPTION_NAME_DEPRECATED =
             ConfigOptions.key("scan.startup.sub-name")
                     .stringType()
                     .noDefaultValue()
@@ -89,6 +99,24 @@ public final class PulsarTableOptions {
                                     .build());
 
     public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("source.start.message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) Message id that is 
used to specify a consuming starting "
+                                                    + "point for source. Use 
%s, %s or pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s. This option 
takes precedence over "
+                                                    + 
"source.start.publish-time.",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+    public static final ConfigOption<String> 
SOURCE_START_FROM_MESSAGE_ID_DEPRECATED =
             ConfigOptions.key("scan.startup.sub-start-offset")
                     .stringType()
                     .noDefaultValue()
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
index 281e3eb5e9..8864ca45c0 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarTableSource.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.pulsar.table.source;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.table.source.PulsarReadableMetadata;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -114,15 +116,18 @@ public class PulsarTableSource implements 
ScanTableSource, SupportsReadingMetada
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
         PulsarDeserializationSchema<RowData> deserializationSchema =
                 
deserializationSchemaFactory.createPulsarDeserialization(context);
-        PulsarSource<RowData> source =
-                PulsarSource.builder()
-                        .setTopics(topics)
-                        .setStartCursor(startCursor)
-                        .setUnboundedStopCursor(stopCursor)
-                        .setDeserializationSchema(deserializationSchema)
-                        .setProperties(properties)
-                        .build();
-        return SourceProvider.of(source);
+        PulsarSourceBuilder<RowData> sourceBuilder = PulsarSource.builder();
+        sourceBuilder
+                .setTopics(topics)
+                .setStartCursor(startCursor)
+                .setDeserializationSchema(deserializationSchema)
+                .setProperties(properties);
+        if (!(stopCursor instanceof NeverStopCursor)) {
+            sourceBuilder.setBoundedStopCursor(stopCursor);
+        } else {
+            sourceBuilder.setUnboundedStopCursor(stopCursor);
+        }
+        return SourceProvider.of(sourceBuilder.build());
     }
 
     /**

Reply via email to