This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d34a39cf50 [INLONG-10492][Sort] Fix init failure of pulsar connector 
(#10493)
d34a39cf50 is described below

commit d34a39cf506f228d8680f2256ddddf316f6b23e4
Author: vernedeng <[email protected]>
AuthorDate: Mon Jun 24 19:26:23 2024 +0800

    [INLONG-10492][Sort] Fix init failure of pulsar connector (#10493)
---
 .../apache/inlong/sort/pulsar/PulsarTableFactory.java  |  2 ++
 .../inlong/sort/pulsar/PulsarTableOptionUtils.java     |  3 +++
 .../apache/inlong/sort/pulsar/PulsarTableOptions.java  | 18 ++++++++++++++++++
 3 files changed, 23 insertions(+)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 6cbfea2689..80ef5b6363 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -69,6 +69,7 @@ import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SERVICE_URL;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
@@ -198,6 +199,7 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 SOURCE_SUBSCRIPTION_NAME,
                 SOURCE_SUBSCRIPTION_TYPE,
                 STARTUP_MODE,
+                SOURCE_START_FROM_MESSAGE_ID,
                 SOURCE_START_FROM_PUBLISH_TIME,
                 SOURCE_STOP_AT_MESSAGE_ID,
                 SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index a495244776..5b2488dd40 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -46,6 +46,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
@@ -175,6 +176,8 @@ public class PulsarTableOptionUtils {
     public static StartCursor getStartCursor(ReadableConfig tableOptions) {
         if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
             return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
+        } else if 
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
+            return 
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
         } else if 
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
             return 
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
         } else {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index 8aeb541380..59e72201f8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -88,6 +88,24 @@ public final class PulsarTableOptions {
                                             "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
                                     .build());
 
+    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("scan.startup.sub-start-offset")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) Message id that is 
used to specify a consuming starting "
+                                                    + "point for source. Use 
%s, %s or pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s. This option 
takes precedence over "
+                                                    + 
"source.start.publish-time.",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
     public static final ConfigOption<String> STARTUP_MODE =
             ConfigOptions.key("scan.startup.mode")
                     .stringType()

Reply via email to