This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9f276961 [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan
start up mode parameter cannot keep consistent with flink 1.13 (#10509)
8e9f276961 is described below
commit 8e9f276961d1b45553422d9e16368c40238a660c
Author: ZiruiPeng <[email protected]>
AuthorDate: Tue Jun 25 16:48:14 2024 +0800
[INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode
parameter cannot keep consistent with flink 1.13 (#10509)
* [INLONG-10508][Sort] Fix pulsar connector flink 1.15 scan start up mode
parameter cannot keep consistent with flink 1.13
* fix format
---
.../org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 9 ++++++++-
.../java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 1 +
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index 5b2488dd40..c438fe1394 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.pulsar;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
@@ -175,7 +177,12 @@ public class PulsarTableOptionUtils {
public static StartCursor getStartCursor(ReadableConfig tableOptions) {
if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
- return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
+ String mode = tableOptions.getOptional(STARTUP_MODE).get();
+ // to keep consistent with pulsar connector in flink 1.13
+ if
(mode.equals(PulsarScanStartupMode.EXTERNAL_SUBSCRIPTION.getValue())) {
+ return
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+ }
+ return parseMessageIdStartCursor(mode);
} else if
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
return
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
} else if
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index 59e72201f8..6ef1450fb7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -121,6 +121,7 @@ public final class PulsarTableOptions {
code("earliest"),
code("latest"),
code("ledgerId:entryId:partitionId"),
+ code("external-subscription"),
code("12:2:-1"))
.build());