This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd9b4afc1 [INLONG-9081][Sort] Pulsar connector in flink 1.15 should
running in exclusive mode (#9082)
9bd9b4afc1 is described below
commit 9bd9b4afc1cece9f04ac360d7515a77898c2bb45
Author: Sting <[email protected]>
AuthorDate: Fri Oct 20 14:09:39 2023 +0800
[INLONG-9081][Sort] Pulsar connector in flink 1.15 should running in
exclusive mode (#9082)
---
.../main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 6 +++---
.../java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java | 6 +++---
.../main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java | 6 +++---
.../org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java | 6 +++---
.../java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java | 5 +++--
5 files changed, 15 insertions(+), 14 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 9ca4ac4d6d..4adc2e64ba 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -64,13 +64,13 @@ import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SERVICE_URL;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
-import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
import static
org.apache.inlong.sort.pulsar.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
@@ -179,7 +179,7 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
VALUE_FORMAT,
SOURCE_SUBSCRIPTION_NAME,
SOURCE_SUBSCRIPTION_TYPE,
- SOURCE_START_FROM_MESSAGE_ID,
+ STARTUP_MODE,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
@@ -203,7 +203,7 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
SERVICE_URL,
SOURCE_SUBSCRIPTION_TYPE,
SOURCE_SUBSCRIPTION_NAME,
- SOURCE_START_FROM_MESSAGE_ID,
+ STARTUP_MODE,
SOURCE_START_FROM_PUBLISH_TIME,
SOURCE_STOP_AT_MESSAGE_ID,
SOURCE_STOP_AFTER_MESSAGE_ID,
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
index 102ae75938..a495244776 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptionUtils.java
@@ -46,12 +46,12 @@ import java.util.stream.IntStream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
-import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.VALUE_FORMAT;
@@ -173,8 +173,8 @@ public class PulsarTableOptionUtils {
}
public static StartCursor getStartCursor(ReadableConfig tableOptions) {
- if
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
- return
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+ if (tableOptions.getOptional(STARTUP_MODE).isPresent()) {
+ return parseMessageIdStartCursor(tableOptions.get(STARTUP_MODE));
} else if
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
return
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
} else {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
index cbbe812471..8aeb541380 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableOptions.java
@@ -79,7 +79,7 @@ public final class PulsarTableOptions {
* Copied because we want to have a default value for it.
*/
public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
- ConfigOptions.key("source.subscription-name")
+ ConfigOptions.key("scan.startup.sub-name")
.stringType()
.noDefaultValue()
.withDescription(
@@ -88,8 +88,8 @@ public final class PulsarTableOptions {
"The subscription name of the
consumer that is used by the runtime [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
This argument is required for constructing the consumer.")
.build());
- public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
- ConfigOptions.key("source.start.message-id")
+ public static final ConfigOption<String> STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
.stringType()
.noDefaultValue()
.withDescription(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
index 62440a462b..d49296ab9d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableValidationUtils.java
@@ -36,12 +36,12 @@ import java.util.Set;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getValueDecodingFormat;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FIELDS;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.KEY_FORMAT;
-import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.PulsarTableOptions.STARTUP_MODE;
import static org.apache.inlong.sort.pulsar.PulsarTableOptions.TOPIC;
import static org.apache.pulsar.common.naming.TopicName.isValid;
@@ -91,12 +91,12 @@ public class PulsarTableValidationUtils {
}
protected static void validateStartCursorConfigs(ReadableConfig
tableOptions) {
- if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+ if (tableOptions.getOptional(STARTUP_MODE).isPresent()
&&
tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
throw new ValidationException(
String.format(
"Only one of %s and %s can be specified. Detected
both of them",
- SOURCE_START_FROM_MESSAGE_ID,
SOURCE_START_FROM_PUBLISH_TIME));
+ STARTUP_MODE, SOURCE_START_FROM_PUBLISH_TIME));
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index 9b3bf70382..f177eb7543 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sort.pulsar.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
@@ -119,7 +118,9 @@ public class PulsarTableSource implements ScanTableSource,
SupportsReadingMetada
.setUnboundedStopCursor(stopCursor)
.setDeserializationSchema(deserializationSchema)
.setProperties(properties)
-
.setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)
+ // only support exclusive since shared mode requires
pulsar with transaction enabled
+ // and supporting transaction consumes more resources
in pulsar broker
+ .setSubscriptionType(SubscriptionType.Exclusive)
.build();
return SourceProvider.of(source);
}