This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d34a39cf50 [INLONG-10492][Sort] Fix init failure of pulsar connector
(#10493)
d34a39cf50 is described below
commit d34a39cf506f228d8680f2256ddddf316f6b23e4
Author: vernedeng <[email protected]>
AuthorDate: Mon Jun 24 19:26:23 2024 +0800
[INLONG-10492][Sort] Fix init failure of pulsar connector (#10493)
---
.../apache/inlong/sort/pulsar/PulsarTableFactory.java | 2 ++
.../inlong/sort/pulsar/PulsarTableOptionUtils.java | 3 +++
.../apache/inlong/sort/pulsar/PulsarTableOptions.java | 18 ++++++++++++++++++
3 files changed, 23 insertions(+)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 6cbfea2689..80ef5b6363 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -69,6 +69,7 @@ import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SERVICE_URL;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
@@ -198,6 +199,7 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
SOURCE_SUBSCRIPTION_NAME,
SOURCE_SUBSCRIPTION_TYPE,
STARTUP_MODE,
+ SOURCE_START_FROM_MESSAGE_ID,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index a495244776..5b2488dd40 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -46,6 +46,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
+import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
@@ -175,6 +176,8 @@ public class PulsarTableOptionUtils {
public static StartCursor getStartCursor(ReadableConfig tableOptions) {
if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
+ } else if
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
+ return
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
} else if
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
return
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
} else {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index 8aeb541380..59e72201f8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -88,6 +88,24 @@ public final class PulsarTableOptions {
"The subscription name of the
consumer that is used by the runtime [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
This argument is required for constructing the consumer.")
.build());
+ public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+ ConfigOptions.key("scan.startup.sub-start-offset")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) Message id that is
used to specify a consuming starting "
+ + "point for source. Use
%s, %s or pass in a message id "
+ + "representation in %s, "
+ + "such as %s. This option
takes precedence over "
+ +
"source.start.publish-time.",
+ code("earliest"),
+ code("latest"),
+
code("ledgerId:entryId:partitionId"),
+ code("12:2:-1"))
+ .build());
+
public static final ConfigOption<String> STARTUP_MODE =
ConfigOptions.key("scan.startup.mode")
.stringType()