This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd9b4afc1 [INLONG-9081][Sort] Pulsar connector in flink 1.15 should 
running in exclusive mode  (#9082)
9bd9b4afc1 is described below

commit 9bd9b4afc1cece9f04ac360d7515a77898c2bb45
Author: Sting <[email protected]>
AuthorDate: Fri Oct 20 14:09:39 2023 +0800

    [INLONG-9081][Sort] Pulsar connector in flink 1.15 should running in 
exclusive mode  (#9082)
---
 .../main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 6 +++---
 .../java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java  | 6 +++---
 .../main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 6 +++---
 .../org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java   | 6 +++---
 .../java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java | 5 +++--
 5 files changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 9ca4ac4d6d..4adc2e64ba 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -64,13 +64,13 @@ import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SERVICE_URL;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
-import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
@@ -179,7 +179,7 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 VALUE_FORMAT,
                 SOURCE_SUBSCRIPTION_NAME,
                 SOURCE_SUBSCRIPTION_TYPE,
-                SOURCE_START_FROM_MESSAGE_ID,
+                STARTUP_MODE,
                 SOURCE_START_FROM_PUBLISH_TIME,
                 SOURCE_STOP_AT_MESSAGE_ID,
                 SOURCE_STOP_AFTER_MESSAGE_ID,
@@ -203,7 +203,7 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 SERVICE_URL,
                 SOURCE_SUBSCRIPTION_TYPE,
                 SOURCE_SUBSCRIPTION_NAME,
-                SOURCE_START_FROM_MESSAGE_ID,
+                STARTUP_MODE,
                 SOURCE_START_FROM_PUBLISH_TIME,
                 SOURCE_STOP_AT_MESSAGE_ID,
                 SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index 102ae75938..a495244776 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -46,12 +46,12 @@ import java.util.stream.IntStream;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
-import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
 
@@ -173,8 +173,8 @@ public class PulsarTableOptionUtils {
     }
 
     public static StartCursor getStartCursor(ReadableConfig tableOptions) {
-        if 
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
-            return 
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+        if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
+            return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
         } else if 
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
             return 
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
         } else {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index cbbe812471..8aeb541380 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -79,7 +79,7 @@ public final class PulsarTableOptions {
      * Copied because we want to have a default value for it.
      */
     public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
-            ConfigOptions.key("source.subscription-name")
+            ConfigOptions.key("scan.startup.sub-name")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
@@ -88,8 +88,8 @@ public final class PulsarTableOptions {
                                             "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
                                     .build());
 
-    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
-            ConfigOptions.key("source.start.message-id")
+    public static final ConfigOption<String> STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
index 62440a462b..d49296ab9d 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
@@ -36,12 +36,12 @@ import java.util.Set;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
-import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
 import static 
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
 import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
 import static org.apache.pulsar.common.naming.TopicName.isValid;
 
@@ -91,12 +91,12 @@ public class PulsarTableValidationUtils {
     }
 
     protected static void validateStartCursorConfigs(ReadableConfig 
tableOptions) {
-        if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+        if (tableOptions.getOptional(STARTUP_MODE).isPresent()
                 && 
tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
             throw new ValidationException(
                     String.format(
                             "Only one of %s and %s can be specified. Detected 
both of them",
-                            SOURCE_START_FROM_MESSAGE_ID, 
SOURCE_START_FROM_PUBLISH_TIME));
+                            STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME));
         }
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index 9b3bf70382..f177eb7543 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sort.pulsar.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
@@ -119,7 +118,9 @@ public class PulsarTableSource implements ScanTableSource, 
SupportsReadingMetada
                         .setUnboundedStopCursor(stopCursor)
                         .setDeserializationSchema(deserializationSchema)
                         .setProperties(properties)
-                        
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
+                        // only support exclusive since shared mode requires 
pulsar with transaction enabled
+                        // and supporting transaction consumes more resources 
in pulsar broker
+                        .setSubscriptionType(SubscriptionType.Exclusive)
                         .build();
         return SourceProvider.of(source);
     }

Reply via email to