This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4db62df903 [INLONG-10053][Sort] Support flink-connector-pulsar based
on flink 1.18 (#10063)
4db62df903 is described below
commit 4db62df9032d26ab9fca2c9bab0f7b7b0dc2a4a9
Author: AloysZhang <[email protected]>
AuthorDate: Thu Apr 25 09:49:19 2024 +0800
[INLONG-10053][Sort] Support flink-connector-pulsar based on flink 1.18
(#10063)
---
.github/workflows/ci_build.yml | 7 +
inlong-distribution/pom.xml | 1 +
.../src/main/assemblies/sort-connectors-v1.18.xml | 40 +++
inlong-sort/sort-flink/sort-flink-v1.18/pom.xml | 1 +
.../sort-flink-v1.18/{ => sort-connectors}/pom.xml | 20 +-
.../sort-connectors/pulsar/pom.xml | 121 ++++++++
.../sort/pulsar/table/PulsarTableFactory.java | 303 +++++++++++++++++++
.../sort/pulsar/table/PulsarTableOptionUtils.java | 321 +++++++++++++++++++++
.../sort/pulsar/table/PulsarTableOptions.java | 294 +++++++++++++++++++
.../pulsar/table/PulsarTableValidationUtils.java | 195 +++++++++++++
.../org.apache.flink.table.factories.Factory | 16 +
licenses/inlong-sort-connectors/LICENSE | 7 +
12 files changed, 1319 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/ci_build.yml b/.github/workflows/ci_build.yml
index 0a579894f5..25a05e6bd3 100644
--- a/.github/workflows/ci_build.yml
+++ b/.github/workflows/ci_build.yml
@@ -121,5 +121,12 @@ jobs:
name: apache-inlong-${{ env.VERSION
}}-sort-connectors-flink-v1.15.tar.gz
path: ./inlong-distribution/target/apache-inlong-${{ env.VERSION
}}-sort-connectors-flink-v1.15.tar.gz
+ - name: Upload sort connectors package for flink v1.18
+ if: ${{ success() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: apache-inlong-${{ env.VERSION
}}-sort-connectors-flink-v1.18.tar.gz
+ path: ./inlong-distribution/target/apache-inlong-${{ env.VERSION
}}-sort-connectors-flink-v1.18.tar.gz
+
- name: Clean up build packages
run: mvn clean
diff --git a/inlong-distribution/pom.xml b/inlong-distribution/pom.xml
index 47751b9841..8a85518a0a 100644
--- a/inlong-distribution/pom.xml
+++ b/inlong-distribution/pom.xml
@@ -65,6 +65,7 @@
<descriptors>
<descriptor>src/main/assemblies/sort-connectors-v1.13.xml</descriptor>
<descriptor>src/main/assemblies/sort-connectors-v1.15.xml</descriptor>
+
<descriptor>src/main/assemblies/sort-connectors-v1.18.xml</descriptor>
</descriptors>
</configuration>
</execution>
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
new file mode 100644
index 0000000000..61465915fa
--- /dev/null
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly>
+ <id>sort-connectors-flink-v1.18</id>
+
+ <formats>
+ <format>dir</format>
+ <format>tar.gz</format>
+ </formats>
+ <includeBaseDirectory>true</includeBaseDirectory>
+ <fileSets>
+ <!--connector plugin-->
+ <fileSet>
+
<directory>../inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/target</directory>
+ <outputDirectory>inlong-sort/connectors</outputDirectory>
+ <includes>
+
<include>sort-connector-pulsar-v1.18-${project.version}.jar</include>
+ </includes>
+ <fileMode>0644</fileMode>
+ </fileSet>
+
+ </fileSets>
+</assembly>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
index 48a76d5fe4..323c240ad7 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
@@ -32,6 +32,7 @@
<modules>
<module>sort-flink-dependencies</module>
+ <module>sort-connectors</module>
</modules>
<properties>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
similarity index 71%
copy from inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
copy to inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index 48a76d5fe4..a3fe402733 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -22,22 +22,28 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
- <artifactId>sort-flink</artifactId>
+ <artifactId>sort-flink-v1.18</artifactId>
<version>1.13.0-SNAPSHOT</version>
</parent>
- <artifactId>sort-flink-v1.18</artifactId>
+ <artifactId>sort-connectors-v1.18</artifactId>
<packaging>pom</packaging>
- <name>Apache InLong - Sort Flink v1.18</name>
+ <name>Apache InLong - Sort Connectors v1.18</name>
<modules>
- <module>sort-flink-dependencies</module>
+ <module>pulsar</module>
</modules>
<properties>
-
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
- <flink.version>1.18.1</flink.version>
- <flink.scala.binary.version>2.12</flink.scala.binary.version>
+
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-flink-dependencies-v1.18</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
new file mode 100644
index 0000000000..6721922c29
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors-v1.18</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>sort-connector-pulsar-v1.18</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache InLong - Sort-connector-pulsar</name>
+
+ <properties>
+
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+ <flink.connector.version>4.1.0-1.18</flink.connector.version>
+ <pulsar.version>3.0.2</pulsar.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-all</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pulsar</artifactId>
+ <version>${flink.connector.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <!-- Shade all the dependencies to avoid conflicts -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet>
+ <includes>
+ <include>org.apache.inlong:*</include>
+
<include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
+
<include>io.streamnative.connectors:flink-protobuf</include>
+ <include>org.apache.pulsar:*</include>
+
<include>org.apache.flink:flink-connector-pulsar</include>
+ <include>com.google.protobuf:*</include>
+ <include>org.bouncycastle*:*</include>
+ <include>org.bouncycastle*:*</include>
+ <include>javax.*:*</include>
+ <include>org.lz4*:*</include>
+ <include>org.slf4j:jul-to-slf4j</include>
+ <include>io.airlift:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+
<artifact>org.apache.inlong:sort-connector-*</artifact>
+ <includes>
+ <include>org/apache/inlong/**</include>
+
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+ </includes>
+ </filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>log4j.properties</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
new file mode 100644
index 0000000000..a6e7caa00e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import
org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
+import
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.createValueFormatProjection;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getKeyDecodingFormat;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getKeyEncodingFormat;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getMessageDelayMillis;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getPulsarProperties;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getStartCursor;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getStopCursor;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getSubscriptionType;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicListFromOptions;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicRouter;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicRoutingMode;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.EXPLICIT;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.STARTUP_MODE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validateTableSinkOptions;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validateTableSourceOptions;
+import static
org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+
+/**
+ * Factory for creating {@link DynamicTableSource} and {@link
DynamicTableSink}.
+ *
+ * <p>The main role of this class is to retrieve config options and validate
options from config and
+ * the table schema. It also sets default values if a config option is not
present.
+ */
+public class PulsarTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "pulsar-inlong";
+
+ public static final String DEFAULT_SUBSCRIPTION_NAME_PREFIX =
"flink-sql-connector-pulsar-";
+
+ public static final boolean UPSERT_DISABLED = false;
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ // Format options should be retrieved before validation.
+ final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat
=
+ getKeyDecodingFormat(helper);
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
+ getValueDecodingFormat(helper);
+ ReadableConfig tableOptions = helper.getOptions();
+
+ // Validate configs are not conflict; each options is consumed; no
unwanted configs
+ // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
+ helper.validateExcept(
+ PulsarOptions.CLIENT_CONFIG_PREFIX,
+ PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+ PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+ PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+ PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+ validatePrimaryKeyConstraints(
+ context.getObjectIdentifier(), context.getPrimaryKeyIndexes(),
helper);
+
+ validateTableSourceOptions(tableOptions);
+
+ // Retrieve configs
+ final List<String> topics = getTopicListFromOptions(tableOptions);
+ final StartCursor startCursor = getStartCursor(tableOptions);
+ final StopCursor stopCursor = getStopCursor(tableOptions);
+ final SubscriptionType subscriptionType =
getSubscriptionType(tableOptions);
+
+ // Forward source configs
+ final Properties properties = getPulsarProperties(tableOptions);
+ properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
+ // Set random subscriptionName if not provided
+ properties.setProperty(
+ PULSAR_SUBSCRIPTION_NAME.key(),
+ tableOptions
+ .getOptional(SOURCE_SUBSCRIPTION_NAME)
+ .orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX +
randomAlphabetic(5)));
+ // Retrieve physical fields (not including computed or metadata
fields),
+ // and projections and create a schema factory based on such
information.
+ final DataType physicalDataType = context.getPhysicalRowDataType();
+
+ final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
+ final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+
+ final PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory =
+ new PulsarTableDeserializationSchemaFactory(
+ physicalDataType,
+ keyDecodingFormat,
+ keyProjection,
+ valueDecodingFormat,
+ valueProjection,
+ UPSERT_DISABLED);
+
+ // Set default values for configuration not exposed to user.
+ final DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForMetadataPushdown =
+ valueDecodingFormat;
+ final ChangelogMode changelogMode =
decodingFormatForMetadataPushdown.getChangelogMode();
+
+ return new PulsarTableSource(
+ deserializationSchemaFactory,
+ decodingFormatForMetadataPushdown,
+ changelogMode,
+ topics,
+ properties,
+ startCursor,
+ stopCursor,
+ subscriptionType);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ // Format options should be retrieved before validation.
+ final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+ getKeyEncodingFormat(helper);
+ final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
=
+ getValueEncodingFormat(helper);
+ ReadableConfig tableOptions = helper.getOptions();
+
+ // Validate configs are not conflict; each options is consumed; no
unwanted configs
+ // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
+ helper.validateExcept(
+ PulsarOptions.CLIENT_CONFIG_PREFIX,
+ PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+ PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+ PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+ PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+ validatePrimaryKeyConstraints(
+ context.getObjectIdentifier(), context.getPrimaryKeyIndexes(),
helper);
+
+ validateTableSinkOptions(tableOptions);
+
+ // Retrieve configs
+ final TopicRouter<RowData> topicRouter =
+ getTopicRouter(tableOptions, context.getClassLoader());
+ final TopicRoutingMode topicRoutingMode =
getTopicRoutingMode(tableOptions);
+ final long messageDelayMillis = getMessageDelayMillis(tableOptions);
+
+ final List<String> topics = getTopicListFromOptions(tableOptions);
+
+ // Forward sink configs
+ final Properties properties = getPulsarProperties(tableOptions);
+ properties.setProperty(PULSAR_SERVICE_URL.key(),
tableOptions.get(SERVICE_URL));
+
+ // Retrieve physical DataType (not including computed or metadata
fields)
+ final DataType physicalDataType = context.getPhysicalRowDataType();
+ final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+ final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
+
+ final PulsarTableSerializationSchemaFactory serializationSchemaFactory
=
+ new PulsarTableSerializationSchemaFactory(
+ physicalDataType,
+ keyEncodingFormat,
+ keyProjection,
+ valueEncodingFormat,
+ valueProjection,
+ UPSERT_DISABLED);
+
+ // Set default values for configuration not exposed to user.
+ final DeliveryGuarantee deliveryGuarantee =
DeliveryGuarantee.AT_LEAST_ONCE;
+ final ChangelogMode changelogMode =
valueEncodingFormat.getChangelogMode();
+
+ return new PulsarTableSink(
+ serializationSchemaFactory,
+ changelogMode,
+ topics,
+ properties,
+ deliveryGuarantee,
+ topicRouter,
+ topicRoutingMode,
+ messageDelayMillis);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Stream.of(TOPIC, SERVICE_URL).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Stream.of(
+ FactoryUtil.FORMAT,
+ VALUE_FORMAT,
+ ADMIN_URL,
+ STARTUP_MODE,
+ SOURCE_SUBSCRIPTION_NAME,
+ SOURCE_SUBSCRIPTION_TYPE,
+ SOURCE_START_FROM_MESSAGE_ID,
+ SOURCE_START_FROM_PUBLISH_TIME,
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME,
+ SINK_CUSTOM_TOPIC_ROUTER,
+ SINK_TOPIC_ROUTING_MODE,
+ SINK_MESSAGE_DELAY_INTERVAL,
+ SINK_PARALLELISM,
+ KEY_FORMAT,
+ KEY_FIELDS,
+ EXPLICIT,
+ AUDIT_KEYS,
+ INLONG_METRIC,
+ INLONG_AUDIT)
+ .collect(Collectors.toSet());
+ }
+
+ /** Format and Delivery guarantee related options are not forward options.
*/
+ @Override
+ public Set<ConfigOption<?>> forwardOptions() {
+ return Stream.of(
+ TOPIC,
+ SERVICE_URL,
+ STARTUP_MODE,
+ SOURCE_SUBSCRIPTION_TYPE,
+ SOURCE_SUBSCRIPTION_NAME,
+ SOURCE_START_FROM_MESSAGE_ID,
+ SOURCE_START_FROM_PUBLISH_TIME,
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME,
+ SINK_CUSTOM_TOPIC_ROUTER,
+ SINK_TOPIC_ROUTING_MODE,
+ SINK_MESSAGE_DELAY_INTERVAL)
+ .collect(Collectors.toSet());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
new file mode 100644
index 0000000000..12ef459280
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+
+/**
+ * A util class for getting fields from config options, getting formats and
other useful
+ * information.
+ *
+ * <p>It contains the following functionalities.
+ *
+ * <ul>
+ * <li>Get Topics from configurations.
+ * <li>Get source StartCursor from configurations.
+ * <li>Get source SubscriptionType from configurations.
+ * <li>Get sink messageDelayMillis from configurations.
+ * <li>Get sink TopicRouter/TopicRoutingMode from configurations.
+ * <li>Create key and value encoding/decoding format.
+ * <li>Create key and value projection.
+ * </ul>
+ */
+public class PulsarTableOptionUtils {
+
+ private PulsarTableOptionUtils() {
+ }
+
+ public static final String TOPIC_LIST_DELIMITER = ";";
+
+ //
--------------------------------------------------------------------------------------------
+ // Decoding / Encoding and Projection
+ //
--------------------------------------------------------------------------------------------
+
+ @Nullable
+ public static DecodingFormat<DeserializationSchema<RowData>>
getKeyDecodingFormat(
+ FactoryUtil.TableFactoryHelper helper) {
+ return
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class,
KEY_FORMAT)
+ .orElse(null);
+ }
+
+ @Nullable
+ public static EncodingFormat<SerializationSchema<RowData>>
getKeyEncodingFormat(
+ FactoryUtil.TableFactoryHelper helper) {
+ return
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class,
KEY_FORMAT)
+ .orElse(null);
+ }
+
+ public static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
+ FactoryUtil.TableFactoryHelper helper) {
+ return helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+ .orElseGet(
+ () -> helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
VALUE_FORMAT));
+ }
+
+ public static EncodingFormat<SerializationSchema<RowData>>
getValueEncodingFormat(
+ FactoryUtil.TableFactoryHelper helper) {
+ return helper.discoverOptionalEncodingFormat(
+ SerializationFormatFactory.class, FactoryUtil.FORMAT)
+ .orElseGet(
+ () -> helper.discoverEncodingFormat(
+ SerializationFormatFactory.class,
VALUE_FORMAT));
+ }
+
+ /**
+ * Creates an array of indices that determine which physical fields of the
table schema to
+ * include in the key format and the order that those fields have in the
key format.
+ */
+ public static int[] createKeyFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type
expected.");
+ final Optional<String> optionalKeyFormat =
options.getOptional(KEY_FORMAT);
+ final Optional<List<String>> optionalKeyFields =
options.getOptional(KEY_FIELDS);
+
+ if (!optionalKeyFormat.isPresent()) {
+ return new int[0];
+ }
+
+ final List<String> keyFields = optionalKeyFields.get();
+ final List<String> physicalFields =
LogicalTypeChecks.getFieldNames(physicalType);
+ return keyFields.stream()
+ .mapToInt(
+ keyField -> {
+ final int pos = physicalFields.indexOf(keyField);
+ // check that field name exists
+ if (pos < 0) {
+ throw new ValidationException(
+ String.format(
+ "Could not find the field '%s'
in the table schema for usage in the key format. "
+ + "A key field must be
a regular, physical column. "
+ + "The following
columns can be selected in the '%s' option: %s",
+ keyField, KEY_FIELDS.key(),
physicalFields));
+ }
+ // check that field name is prefixed correctly
+ return pos;
+ })
+ .toArray();
+ }
+
+ public static int[] createValueFormatProjection(
+ ReadableConfig options, DataType physicalDataType) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type
expected.");
+
+ final int physicalFieldCount =
LogicalTypeChecks.getFieldCount(physicalType);
+ final IntStream physicalFields = IntStream.range(0,
physicalFieldCount);
+ final int[] keyProjection = createKeyFormatProjection(options,
physicalDataType);
+ return physicalFields
+ .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k ==
pos))
+ .toArray();
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Table Source Option Utils
+ //
--------------------------------------------------------------------------------------------
+
+ public static List<String> getTopicListFromOptions(ReadableConfig
tableOptions) {
+ return tableOptions.get(TOPIC);
+ }
+
+ public static Properties getPulsarProperties(ReadableConfig tableOptions) {
+ final Map<String, String> configs = ((Configuration)
tableOptions).toMap();
+ return getPulsarProperties(configs);
+ }
+
+ public static Properties getPulsarProperties(Map<String, String> configs) {
+ return getPulsarPropertiesWithPrefix(configs, "pulsar");
+ }
+
+ public static Properties getPulsarPropertiesWithPrefix(
+ ReadableConfig tableOptions, String prefix) {
+ final Map<String, String> configs = ((Configuration)
tableOptions).toMap();
+ return getPulsarPropertiesWithPrefix(configs, prefix);
+ }
+
+ public static Properties getPulsarPropertiesWithPrefix(
+ Map<String, String> configs, String prefix) {
+ final Properties pulsarProperties = new Properties();
+ configs.keySet().stream()
+ .filter(key -> key.startsWith(prefix))
+ .forEach(key -> pulsarProperties.put(key, configs.get(key)));
+ return pulsarProperties;
+ }
+
+ public static StartCursor getStartCursor(ReadableConfig tableOptions) {
+ if
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
+ return
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+ } else if
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+ return
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
+ } else {
+ return StartCursor.earliest();
+ }
+ }
+
+ public static StopCursor getStopCursor(ReadableConfig tableOptions) {
+ if (tableOptions.getOptional(SOURCE_STOP_AT_MESSAGE_ID).isPresent()) {
+ return
parseAtMessageIdStopCursor(tableOptions.get(SOURCE_STOP_AT_MESSAGE_ID));
+ } else if
(tableOptions.getOptional(SOURCE_STOP_AFTER_MESSAGE_ID).isPresent()) {
+ return
parseAfterMessageIdStopCursor(tableOptions.get(SOURCE_STOP_AFTER_MESSAGE_ID));
+ } else if
(tableOptions.getOptional(SOURCE_STOP_AT_PUBLISH_TIME).isPresent()) {
+ return
parseAtPublishTimeStopCursor(tableOptions.get(SOURCE_STOP_AT_PUBLISH_TIME));
+ } else {
+ return StopCursor.never();
+ }
+ }
+
+ public static SubscriptionType getSubscriptionType(ReadableConfig
tableOptions) {
+ return tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+ }
+
+ protected static StartCursor parseMessageIdStartCursor(String config) {
+ if (Objects.equals(config, "earliest")) {
+ return StartCursor.earliest();
+ } else if (Objects.equals(config, "latest")) {
+ return StartCursor.latest();
+ } else {
+ return StartCursor.fromMessageId(parseMessageIdString(config));
+ }
+ }
+
+ protected static StartCursor parsePublishTimeStartCursor(Long config) {
+ return StartCursor.fromPublishTime(config);
+ }
+
+ protected static StopCursor parseAtMessageIdStopCursor(String config) {
+ if (Objects.equals(config, "never")) {
+ return StopCursor.never();
+ } else if (Objects.equals(config, "latest")) {
+ return StopCursor.latest();
+ } else {
+ return StopCursor.atMessageId(parseMessageIdString(config));
+ }
+ }
+
+ protected static StopCursor parseAfterMessageIdStopCursor(String config) {
+ return StopCursor.afterMessageId(parseMessageIdString(config));
+ }
+
+ protected static StopCursor parseAtPublishTimeStopCursor(Long config) {
+ return StopCursor.atPublishTime(config);
+ }
+
+ protected static MessageIdImpl parseMessageIdString(String config) {
+ String[] tokens = config.split(":", 3);
+ checkArgument(tokens.length == 3, "MessageId format must be
ledgerId:entryId:partitionId.");
+
+ try {
+ long ledgerId = Long.parseLong(tokens[0]);
+ long entryId = Long.parseLong(tokens[1]);
+ int partitionId = Integer.parseInt(tokens[2]);
+ return new MessageIdImpl(ledgerId, entryId, partitionId);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "MessageId format must be ledgerId:entryId:partitionId. "
+ + "Each id should be able to parsed to long
type.");
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Table Sink Option Utils
+ //
--------------------------------------------------------------------------------------------
+
+ public static TopicRouter<RowData> getTopicRouter(
+ ReadableConfig readableConfig, ClassLoader classLoader) {
+ if (!readableConfig.getOptional(SINK_CUSTOM_TOPIC_ROUTER).isPresent())
{
+ return null;
+ }
+
+ String className = readableConfig.get(SINK_CUSTOM_TOPIC_ROUTER);
+ try {
+ Class<?> clazz = Class.forName(className, true, classLoader);
+ if (!TopicRouter.class.isAssignableFrom(clazz)) {
+ throw new ValidationException(
+ String.format(
+ "Sink TopicRouter class '%s' should extend
from the required class %s",
+ className, TopicRouter.class.getName()));
+ }
+ @SuppressWarnings("unchecked")
+ final TopicRouter<RowData> topicRouter =
+ InstantiationUtil.instantiate(className,
TopicRouter.class, classLoader);
+
+ return topicRouter;
+ } catch (ClassNotFoundException | FlinkException e) {
+ throw new ValidationException(
+ String.format(
+ "Could not find and instantiate TopicRouter class
'%s'", className),
+ e);
+ }
+ }
+
+ public static TopicRoutingMode getTopicRoutingMode(ReadableConfig
readableConfig) {
+ return readableConfig.get(SINK_TOPIC_ROUTING_MODE);
+ }
+
+ public static long getMessageDelayMillis(ReadableConfig readableConfig) {
+ return readableConfig.get(SINK_MESSAGE_DELAY_INTERVAL).toMillis();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
new file mode 100644
index 0000000000..ae186dc739
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+ private PulsarTableOptions() {
+ }
+
+ public static final ConfigOption<List<String>> TOPIC =
+ ConfigOptions.key("topic")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Topic name(s) the table reads
data from. It can be a single topic name or a list of topic names separated by
a semicolon symbol (%s) like %s. "
+ + "When a list of topics
configured, please ensure that all the topics are in the same schema as Flink
Table need a fixed schema.",
+ code(";"), code("topic-1;topic-2"))
+ .build());
+
+ //
--------------------------------------------------------------------------------------------
+ // Table Source Options
+ //
--------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<SubscriptionType>
SOURCE_SUBSCRIPTION_TYPE =
+ ConfigOptions.key("source.subscription-type")
+ .enumType(SubscriptionType.class)
+ .defaultValue(SubscriptionType.Exclusive)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The [subscription
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
that is supported by the [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
Currently, only %s and %s subscription types are supported.",
+ code("Exclusive"), code("Shared"))
+ .build());
+
+ /**
+ * Exactly same as {@link
+ *
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+ * Copied because we want to have a default value for it.
+ */
+ public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+ ConfigOptions.key("scan.startup.sub-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The subscription name of the
consumer that is used by the runtime [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
This argument is required for constructing the consumer.")
+ .build());
+
+ public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+ ConfigOptions.key("scan.startup.sub-start-offset")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) Message id that is
used to specify a consuming starting "
+ + "point for source. Use
%s, %s or pass in a message id "
+ + "representation in %s, "
+ + "such as %s. This option
takes precedence over "
+ +
"source.start.publish-time.",
+ code("earliest"),
+ code("latest"),
+
code("ledgerId:entryId:partitionId"),
+ code("12:2:-1"))
+ .build());
+
+ public static final ConfigOption<Long> SOURCE_START_FROM_PUBLISH_TIME =
+ ConfigOptions.key("source.start.publish-time")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) Publish timestamp that
is used to specify a starting point for the [Pulsar DataStream source
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source)
to consume data. "
+ + "Option
source.start.message-id takes precedence over this one.")
+ .build());
+
+ public static final ConfigOption<String> SOURCE_STOP_AT_MESSAGE_ID =
+ ConfigOptions.key("source.stop.at-message-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional message id used to specify a stop cursor
for the unbounded sql "
+ + "source. Use \"never\", \"latest\" or
pass in a message id "
+ + "representation in
\"ledgerId:entryId:partitionId\", "
+ + "such as \"12:2:-1\"");
+
+ public static final ConfigOption<String> SOURCE_STOP_AFTER_MESSAGE_ID =
+ ConfigOptions.key("source.stop.after-message-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional message id used to specify a stop
position but include the "
+ + "given message in the consuming result
for the unbounded sql "
+ + "source. Pass in a message id "
+ + "representation in
\"ledgerId:entryId:partitionId\", "
+ + "such as \"12:2:-1\". ");
+
+ public static final ConfigOption<Long> SOURCE_STOP_AT_PUBLISH_TIME =
+ ConfigOptions.key("source.stop.at-publish-time")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional publish timestamp used to specify a stop
cursor"
+ + " for the unbounded sql source.");
+
+ //
--------------------------------------------------------------------------------------------
+ // Table Sink Options
+ //
--------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<String> SINK_CUSTOM_TOPIC_ROUTER =
+ ConfigOptions.key("sink.custom-topic-router")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) the custom topic
router class URL that is used in the [Pulsar DataStream sink
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).
If this option is provided, the %s option will be ignored.",
+ code("sink.topic-routing-mode"))
+ .build());
+
+ public static final ConfigOption<TopicRoutingMode> SINK_TOPIC_ROUTING_MODE
=
+ ConfigOptions.key("sink.topic-routing-mode")
+ .enumType(TopicRoutingMode.class)
+ .defaultValue(TopicRoutingMode.ROUND_ROBIN)
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) the topic routing
mode. Available options are %s and %s. By default, it is set to %s. If you want
to use a custom topic router, use the %s option to determine the partition for
a particular message.",
+ code("round-robin"),
+ code("message-key-hash"),
+ code("round-robin"),
+ code("sink.custom-topic-router"))
+ .build());
+
+ public static final ConfigOption<Duration> SINK_MESSAGE_DELAY_INTERVAL =
+ ConfigOptions.key("sink.message-delay-interval")
+ .durationType()
+ .defaultValue(Duration.ZERO)
+ .withDescription(
+ "(Optional) the message delay delivery interval
that is used in the [Pulsar DataStream sink
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).");
+
+ //
--------------------------------------------------------------------------------------------
+ // Format Options
+ //
--------------------------------------------------------------------------------------------
+
+ public static final ConfigOption<String> KEY_FORMAT =
+ ConfigOptions.key("key" + FORMAT_SUFFIX)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the format identifier for
decoding/encoding key bytes in "
+ + "Pulsar message. The identifier is used
to discover a suitable format factory.");
+
+ public static final ConfigOption<List<String>> KEY_FIELDS =
+ ConfigOptions.key("key.fields")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "An explicit list of physical columns from the
table schema that are decoded/encoded from the key bytes of a Pulsar message.
By default, this list is empty and thus a key is undefined.");
+
+ public static final ConfigOption<String> VALUE_FORMAT =
+ ConfigOptions.key("value" + FORMAT_SUFFIX)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the format identifier for
decoding/encoding value data. "
+ + "The identifier is used to discover a
suitable format factory.");
+
+ //
--------------------------------------------------------------------------------------------
+ // Pulsar Options
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Exactly same as {@link
+ *
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_SERVICE_URL}.
Copied
+ * here because it is a required config option and should not be included
in the {@link
+ *
org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)}
method.
+ *
+ * <p>By default all {@link
org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
+ * included in the validateExcept() method./p>
+ */
+ public static final ConfigOption<String> SERVICE_URL =
+ ConfigOptions.key("service-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text("Service URL provider for Pulsar
service.")
+ .linebreak()
+ .text(
+ "To connect to Pulsar using client
libraries, you need to specify a Pulsar protocol URL.")
+ .linebreak()
+ .text(
+ "You can assign Pulsar protocol
URLs to specific clusters and use the Pulsar scheme.")
+ .linebreak()
+ .list(
+ text(
+ "This is an example of %s:
%s.",
+ code("localhost"),
+
code("pulsar://localhost:6650")),
+ text(
+ "If you have multiple
brokers, the URL is as: %s",
+ code(
+
"pulsar://localhost:6550,localhost:6651,localhost:6652")),
+ text(
+ "A URL for a production
Pulsar cluster is as: %s",
+ code(
+
"pulsar://pulsar.us-west.example.com:6650")),
+ text(
+ "If you use TLS
authentication, the URL is as %s",
+ code(
+
"pulsar+ssl://pulsar.us-west.example.com:6651")))
+ .build());
+
+ public static final ConfigOption<Boolean> EXPLICIT =
+ ConfigOptions.key("explicit")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Indicate if the table is an explicit
Flink table.");
+
+ public static final ConfigOption<String> ADMIN_URL =
+ ConfigOptions.key("admin-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The Pulsar service HTTP URL for
the admin endpoint. For example, %s, or %s for TLS.",
+
code("http://my-broker.example.com:8080"),
+
code("https://my-broker.example.com:8443"))
+ .build());
+
+ public static final ConfigOption<String> STARTUP_MODE =
+ ConfigOptions.key("scan.startup.mode")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "(Optional) Message id that is
used to specify a consuming starting "
+ + "point for source. Use
%s, %s or pass in a message id "
+ + "representation in %s, "
+ + "such as %s. This option
takes precedence over "
+ +
"source.start.publish-time.",
+ code("earliest"),
+ code("latest"),
+
code("ledgerId:entryId:partitionId"),
+ code("12:2:-1"))
+ .build());
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
new file mode 100644
index 0000000000..8dac83871a
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.RowKind;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static org.apache.pulsar.common.naming.TopicName.isValid;
+
+/** Util class for source and sink validation rules. */
+public class PulsarTableValidationUtils {
+
+ private PulsarTableValidationUtils() {
+ }
+
+ public static void validatePrimaryKeyConstraints(
+ ObjectIdentifier tableName,
+ int[] primaryKeyIndexes,
+ FactoryUtil.TableFactoryHelper helper) {
+ final DecodingFormat<DeserializationSchema<RowData>> format =
+ getValueDecodingFormat(helper);
+ if (primaryKeyIndexes.length > 0
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "The Pulsar table '%s' with '%s' format doesn't
support defining PRIMARY KEY constraint"
+ + " on the table, because it can't
guarantee the semantic of primary key.",
+ tableName.asSummaryString(), format));
+ }
+ }
+
+ public static void validateTableSourceOptions(ReadableConfig tableOptions)
{
+ validateTopicsConfigs(tableOptions);
+ validateStartCursorConfigs(tableOptions);
+ validateStopCursorConfigs(tableOptions);
+ validateSubscriptionTypeConfigs(tableOptions);
+ validateKeyFormatConfigs(tableOptions);
+ }
+
+ public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+ validateTopicsConfigs(tableOptions);
+ validateKeyFormatConfigs(tableOptions);
+ validateSinkRoutingConfigs(tableOptions);
+ }
+
+ protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
+ if (tableOptions.get(TOPIC).isEmpty()) {
+ throw new ValidationException("The topics list should not be
empty.");
+ }
+
+ for (String topic : tableOptions.get(TOPIC)) {
+ if (!isValid(topic)) {
+ throw new ValidationException(
+ String.format("The topics name %s is not a valid topic
name.", topic));
+ }
+ }
+ }
+
+ protected static void validateStartCursorConfigs(ReadableConfig
tableOptions) {
+ if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+ &&
tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Only one of %s and %s can be specified. Detected
both of them",
+ SOURCE_START_FROM_MESSAGE_ID,
SOURCE_START_FROM_PUBLISH_TIME));
+ }
+ }
+
+ protected static void validateStopCursorConfigs(ReadableConfig
tableOptions) {
+ Set<ConfigOption<?>> conflictConfigOptions =
+ Sets.newHashSet(
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME);
+
+ long configsNums =
+ conflictConfigOptions.stream()
+ .map(tableOptions::getOptional)
+ .filter(Optional::isPresent)
+ .count();
+
+ if (configsNums > 1) {
+ throw new ValidationException(
+ String.format(
+ "Only one of %s, %s and %s can be specified.
Detected more than 1 of them",
+ SOURCE_STOP_AT_MESSAGE_ID,
+ SOURCE_STOP_AFTER_MESSAGE_ID,
+ SOURCE_STOP_AT_PUBLISH_TIME));
+ }
+ }
+
+ protected static void validateSubscriptionTypeConfigs(ReadableConfig
tableOptions) {
+ SubscriptionType subscriptionType =
tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+
+ if (subscriptionType == SubscriptionType.Failover) {
+ throw new ValidationException(
+ String.format(
+ "%s SubscriptionType is not supported. ",
SubscriptionType.Failover));
+ }
+ }
+
+ protected static void validateKeyFormatConfigs(ReadableConfig
tableOptions) {
+ final Optional<String> optionalKeyFormat =
tableOptions.getOptional(KEY_FORMAT);
+ final Optional<List<String>> optionalKeyFields =
tableOptions.getOptional(KEY_FIELDS);
+ if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "The option '%s' can only be declared if a key
format is defined using '%s'.",
+ KEY_FIELDS.key(), KEY_FORMAT.key()));
+ } else if (optionalKeyFormat.isPresent()
+ && (!optionalKeyFields.isPresent() ||
optionalKeyFields.get().size() == 0)) {
+ throw new ValidationException(
+ String.format(
+ "A key format '%s' requires the declaration of one
or more of key fields using '%s'.",
+ KEY_FORMAT.key(), KEY_FIELDS.key()));
+ }
+ }
+
+ protected static void validateSinkRoutingConfigs(ReadableConfig
tableOptions) {
+ if (tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE)
+ .orElse(TopicRoutingMode.ROUND_ROBIN) ==
TopicRoutingMode.CUSTOM) {
+ throw new ValidationException(
+ String.format(
+ "Only %s and %s can be used. For %s, please use
sink.custom-topic-router for"
+ + "custom topic router and do not set this
config.",
+ TopicRoutingMode.ROUND_ROBIN,
+ TopicRoutingMode.MESSAGE_KEY_HASH,
+ TopicRoutingMode.CUSTOM));
+ }
+ if (tableOptions.getOptional(SINK_CUSTOM_TOPIC_ROUTER).isPresent()
+ &&
tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE).isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "Only one of %s and %s can be specified. Detected
both of them",
+ SINK_CUSTOM_TOPIC_ROUTER,
SINK_TOPIC_ROUTING_MODE));
+ }
+ }
+
+ protected static void validateUpsertModeKeyConstraints(
+ ReadableConfig tableOptions, int[] primaryKeyIndexes) {
+ if (!tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+ throw new ValidationException(
+ "Upsert mode requires key.fields set to the primary key
fields, should be set");
+ }
+
+ if (tableOptions.getOptional(KEY_FIELDS).get().size() == 0
+ || primaryKeyIndexes.length == 0) {
+ throw new ValidationException(
+ "'upsert-pulsar' require to define a PRIMARY KEY
constraint. "
+ + "The PRIMARY KEY specifies which columns should
be read from or write to the Pulsar message key. "
+ + "The PRIMARY KEY also defines records in the
'upsert-pulsar' table should update or delete on which keys.");
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..ba5da26e94
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.pulsar.table.PulsarTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 44dce64c4d..bd944b384c 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -831,6 +831,13 @@
Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that
the software have been modified.)
License : https://github.com/jeff-zou/flink-connector-redis/blob/main/LICENSE
+1.3.22
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
+ License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
=======================================================================
Apache InLong Subcomponents: