This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db62df903 [INLONG-10053][Sort] Support flink-connector-pulsar based 
on flink 1.18 (#10063)
4db62df903 is described below

commit 4db62df9032d26ab9fca2c9bab0f7b7b0dc2a4a9
Author: AloysZhang <[email protected]>
AuthorDate: Thu Apr 25 09:49:19 2024 +0800

    [INLONG-10053][Sort] Support flink-connector-pulsar based on flink 1.18 
(#10063)
---
 .github/workflows/ci_build.yml                     |   7 +
 inlong-distribution/pom.xml                        |   1 +
 .../src/main/assemblies/sort-connectors-v1.18.xml  |  40 +++
 inlong-sort/sort-flink/sort-flink-v1.18/pom.xml    |   1 +
 .../sort-flink-v1.18/{ => sort-connectors}/pom.xml |  20 +-
 .../sort-connectors/pulsar/pom.xml                 | 121 ++++++++
 .../sort/pulsar/table/PulsarTableFactory.java      | 303 +++++++++++++++++++
 .../sort/pulsar/table/PulsarTableOptionUtils.java  | 321 +++++++++++++++++++++
 .../sort/pulsar/table/PulsarTableOptions.java      | 294 +++++++++++++++++++
 .../pulsar/table/PulsarTableValidationUtils.java   | 195 +++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 licenses/inlong-sort-connectors/LICENSE            |   7 +
 12 files changed, 1319 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/ci_build.yml b/.github/workflows/ci_build.yml
index 0a579894f5..25a05e6bd3 100644
--- a/.github/workflows/ci_build.yml
+++ b/.github/workflows/ci_build.yml
@@ -121,5 +121,12 @@ jobs:
           name: apache-inlong-${{ env.VERSION 
}}-sort-connectors-flink-v1.15.tar.gz
           path: ./inlong-distribution/target/apache-inlong-${{ env.VERSION 
}}-sort-connectors-flink-v1.15.tar.gz
 
+      - name: Upload sort connectors package for flink v1.18
+        if: ${{ success() }}
+        uses: actions/upload-artifact@v3
+        with:
+          name: apache-inlong-${{ env.VERSION 
}}-sort-connectors-flink-v1.18.tar.gz
+          path: ./inlong-distribution/target/apache-inlong-${{ env.VERSION 
}}-sort-connectors-flink-v1.18.tar.gz
+
       - name: Clean up build packages
         run: mvn clean
diff --git a/inlong-distribution/pom.xml b/inlong-distribution/pom.xml
index 47751b9841..8a85518a0a 100644
--- a/inlong-distribution/pom.xml
+++ b/inlong-distribution/pom.xml
@@ -65,6 +65,7 @@
                             <descriptors>
                                 
<descriptor>src/main/assemblies/sort-connectors-v1.13.xml</descriptor>
                                 
<descriptor>src/main/assemblies/sort-connectors-v1.15.xml</descriptor>
+                                
<descriptor>src/main/assemblies/sort-connectors-v1.18.xml</descriptor>
                             </descriptors>
                         </configuration>
                     </execution>
diff --git a/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml 
b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
new file mode 100644
index 0000000000..61465915fa
--- /dev/null
+++ b/inlong-distribution/src/main/assemblies/sort-connectors-v1.18.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<assembly>
+    <id>sort-connectors-flink-v1.18</id>
+
+    <formats>
+        <format>dir</format>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>true</includeBaseDirectory>
+    <fileSets>
+        <!--connector plugin-->
+        <fileSet>
+            
<directory>../inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/target</directory>
+            <outputDirectory>inlong-sort/connectors</outputDirectory>
+            <includes>
+                
<include>sort-connector-pulsar-v1.18-${project.version}.jar</include>
+            </includes>
+            <fileMode>0644</fileMode>
+        </fileSet>
+
+    </fileSets>
+</assembly>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
index 48a76d5fe4..323c240ad7 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
@@ -32,6 +32,7 @@
 
     <modules>
         <module>sort-flink-dependencies</module>
+        <module>sort-connectors</module>
     </modules>
 
     <properties>
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
similarity index 71%
copy from inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
copy to inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
index 48a76d5fe4..a3fe402733 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pom.xml
@@ -22,22 +22,28 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.inlong</groupId>
-        <artifactId>sort-flink</artifactId>
+        <artifactId>sort-flink-v1.18</artifactId>
         <version>1.13.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>sort-flink-v1.18</artifactId>
+    <artifactId>sort-connectors-v1.18</artifactId>
     <packaging>pom</packaging>
-    <name>Apache InLong - Sort Flink v1.18</name>
+    <name>Apache InLong - Sort Connectors v1.18</name>
 
     <modules>
-        <module>sort-flink-dependencies</module>
+        <module>pulsar</module>
     </modules>
 
     <properties>
-        
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
-        <flink.version>1.18.1</flink.version>
-        <flink.scala.binary.version>2.12</flink.scala.binary.version>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-flink-dependencies-v1.18</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
 </project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
new file mode 100644
index 0000000000..6721922c29
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>sort-connectors-v1.18</artifactId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-connector-pulsar-v1.18</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort-connector-pulsar</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
+        <flink.connector.version>4.1.0-1.18</flink.connector.version>
+        <pulsar.version>3.0.2</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-pulsar</artifactId>
+            <version>${flink.connector.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <!-- Shade all the dependencies to avoid conflicts -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.inlong:*</include>
+                                    
<include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
+                                    
<include>io.streamnative.connectors:flink-protobuf</include>
+                                    <include>org.apache.pulsar:*</include>
+                                    
<include>org.apache.flink:flink-connector-pulsar</include>
+                                    <include>com.google.protobuf:*</include>
+                                    <include>org.bouncycastle*:*</include>
+                                    <include>org.bouncycastle*:*</include>
+                                    <include>javax.*:*</include>
+                                    <include>org.lz4*:*</include>
+                                    <include>org.slf4j:jul-to-slf4j</include>
+                                    <include>io.airlift:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    
<artifact>org.apache.inlong:sort-connector-*</artifact>
+                                    <includes>
+                                        <include>org/apache/inlong/**</include>
+                                        
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
+                                    </includes>
+                                </filter>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>log4j.properties</exclude>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"
 />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
new file mode 100644
index 0000000000..a6e7caa00e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
+import 
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.createValueFormatProjection;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getKeyDecodingFormat;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getKeyEncodingFormat;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getMessageDelayMillis;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getPulsarProperties;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getStartCursor;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getStopCursor;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getSubscriptionType;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicListFromOptions;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicRouter;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getTopicRoutingMode;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.EXPLICIT;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.STARTUP_MODE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validateTableSinkOptions;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableValidationUtils.validateTableSourceOptions;
+import static 
org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+
+/**
+ * Factory for creating {@link DynamicTableSource} and {@link 
DynamicTableSink}.
+ *
+ * <p>The main role of this class is to retrieve config options and validate 
options from config and
+ * the table schema. It also sets default values if a config option is not 
present.
+ */
+public class PulsarTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "pulsar-inlong";
+
+    public static final String DEFAULT_SUBSCRIPTION_NAME_PREFIX = 
"flink-sql-connector-pulsar-";
+
+    public static final boolean UPSERT_DISABLED = false;
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        // Format options should be retrieved before validation.
+        final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat 
=
+                getKeyDecodingFormat(helper);
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                getValueDecodingFormat(helper);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        // Validate configs are not conflict; each options is consumed; no 
unwanted configs
+        // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
+        helper.validateExcept(
+                PulsarOptions.CLIENT_CONFIG_PREFIX,
+                PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+                PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+                PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+        validatePrimaryKeyConstraints(
+                context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);
+
+        validateTableSourceOptions(tableOptions);
+
+        // Retrieve configs
+        final List<String> topics = getTopicListFromOptions(tableOptions);
+        final StartCursor startCursor = getStartCursor(tableOptions);
+        final StopCursor stopCursor = getStopCursor(tableOptions);
+        final SubscriptionType subscriptionType = 
getSubscriptionType(tableOptions);
+
+        // Forward source configs
+        final Properties properties = getPulsarProperties(tableOptions);
+        properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
+        // Set random subscriptionName if not provided
+        properties.setProperty(
+                PULSAR_SUBSCRIPTION_NAME.key(),
+                tableOptions
+                        .getOptional(SOURCE_SUBSCRIPTION_NAME)
+                        .orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX + 
randomAlphabetic(5)));
+        // Retrieve physical fields (not including computed or metadata 
fields),
+        // and projections and create a schema factory based on such 
information.
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory =
+                new PulsarTableDeserializationSchemaFactory(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        keyProjection,
+                        valueDecodingFormat,
+                        valueProjection,
+                        UPSERT_DISABLED);
+
+        // Set default values for configuration not exposed to user.
+        final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForMetadataPushdown =
+                valueDecodingFormat;
+        final ChangelogMode changelogMode = 
decodingFormatForMetadataPushdown.getChangelogMode();
+
+        return new PulsarTableSource(
+                deserializationSchemaFactory,
+                decodingFormatForMetadataPushdown,
+                changelogMode,
+                topics,
+                properties,
+                startCursor,
+                stopCursor,
+                subscriptionType);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        // Format options should be retrieved before validation.
+        final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+                getKeyEncodingFormat(helper);
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                getValueEncodingFormat(helper);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        // Validate configs are not conflict; each options is consumed; no 
unwanted configs
+        // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
+        helper.validateExcept(
+                PulsarOptions.CLIENT_CONFIG_PREFIX,
+                PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+                PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+                PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+        validatePrimaryKeyConstraints(
+                context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);
+
+        validateTableSinkOptions(tableOptions);
+
+        // Retrieve configs
+        final TopicRouter<RowData> topicRouter =
+                getTopicRouter(tableOptions, context.getClassLoader());
+        final TopicRoutingMode topicRoutingMode = 
getTopicRoutingMode(tableOptions);
+        final long messageDelayMillis = getMessageDelayMillis(tableOptions);
+
+        final List<String> topics = getTopicListFromOptions(tableOptions);
+
+        // Forward sink configs
+        final Properties properties = getPulsarProperties(tableOptions);
+        properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
+
+        // Retrieve physical DataType (not including computed or metadata 
fields)
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final PulsarTableSerializationSchemaFactory serializationSchemaFactory 
=
+                new PulsarTableSerializationSchemaFactory(
+                        physicalDataType,
+                        keyEncodingFormat,
+                        keyProjection,
+                        valueEncodingFormat,
+                        valueProjection,
+                        UPSERT_DISABLED);
+
+        // Set default values for configuration not exposed to user.
+        final DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        final ChangelogMode changelogMode = 
valueEncodingFormat.getChangelogMode();
+
+        return new PulsarTableSink(
+                serializationSchemaFactory,
+                changelogMode,
+                topics,
+                properties,
+                deliveryGuarantee,
+                topicRouter,
+                topicRoutingMode,
+                messageDelayMillis);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Stream.of(TOPIC, SERVICE_URL).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Stream.of(
+                FactoryUtil.FORMAT,
+                VALUE_FORMAT,
+                ADMIN_URL,
+                STARTUP_MODE,
+                SOURCE_SUBSCRIPTION_NAME,
+                SOURCE_SUBSCRIPTION_TYPE,
+                SOURCE_START_FROM_MESSAGE_ID,
+                SOURCE_START_FROM_PUBLISH_TIME,
+                SOURCE_STOP_AT_MESSAGE_ID,
+                SOURCE_STOP_AFTER_MESSAGE_ID,
+                SOURCE_STOP_AT_PUBLISH_TIME,
+                SINK_CUSTOM_TOPIC_ROUTER,
+                SINK_TOPIC_ROUTING_MODE,
+                SINK_MESSAGE_DELAY_INTERVAL,
+                SINK_PARALLELISM,
+                KEY_FORMAT,
+                KEY_FIELDS,
+                EXPLICIT,
+                AUDIT_KEYS,
+                INLONG_METRIC,
+                INLONG_AUDIT)
+                .collect(Collectors.toSet());
+    }
+
+    /** Format and Delivery guarantee related options are not forward options. 
*/
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                TOPIC,
+                SERVICE_URL,
+                STARTUP_MODE,
+                SOURCE_SUBSCRIPTION_TYPE,
+                SOURCE_SUBSCRIPTION_NAME,
+                SOURCE_START_FROM_MESSAGE_ID,
+                SOURCE_START_FROM_PUBLISH_TIME,
+                SOURCE_STOP_AT_MESSAGE_ID,
+                SOURCE_STOP_AFTER_MESSAGE_ID,
+                SOURCE_STOP_AT_PUBLISH_TIME,
+                SINK_CUSTOM_TOPIC_ROUTER,
+                SINK_TOPIC_ROUTING_MODE,
+                SINK_MESSAGE_DELAY_INTERVAL)
+                .collect(Collectors.toSet());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
new file mode 100644
index 0000000000..12ef459280
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+
+/**
+ * A util class for getting fields from config options, getting formats and 
other useful
+ * information.
+ *
+ * <p>It contains the following functionalities.
+ *
+ * <ul>
+ *   <li>Get Topics from configurations.
+ *   <li>Get source StartCursor from configurations.
+ *   <li>Get source SubscriptionType from configurations.
+ *   <li>Get sink messageDelayMillis from configurations.
+ *   <li>Get sink TopicRouter/TopicRoutingMode from configurations.
+ *   <li>Create key and value encoding/decoding format.
+ *   <li>Create key and value projection.
+ * </ul>
+ */
+public class PulsarTableOptionUtils {
+
+    private PulsarTableOptionUtils() {
+    }
+
+    public static final String TOPIC_LIST_DELIMITER = ";";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Decoding / Encoding and Projection
+    // 
--------------------------------------------------------------------------------------------
+
+    @Nullable
+    public static DecodingFormat<DeserializationSchema<RowData>> 
getKeyDecodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return 
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, 
KEY_FORMAT)
+                .orElse(null);
+    }
+
+    @Nullable
+    public static EncodingFormat<SerializationSchema<RowData>> 
getKeyEncodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return 
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, 
KEY_FORMAT)
+                .orElse(null);
+    }
+
+    public static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return helper.discoverOptionalDecodingFormat(
+                DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+                .orElseGet(
+                        () -> helper.discoverDecodingFormat(
+                                DeserializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    public static EncodingFormat<SerializationSchema<RowData>> 
getValueEncodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return helper.discoverOptionalEncodingFormat(
+                SerializationFormatFactory.class, FactoryUtil.FORMAT)
+                .orElseGet(
+                        () -> helper.discoverEncodingFormat(
+                                SerializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    /**
+     * Creates an array of indices that determine which physical fields of the 
table schema to
+     * include in the key format and the order that those fields have in the 
key format.
+     */
+    public static int[] createKeyFormatProjection(
+            ReadableConfig options, DataType physicalDataType) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type 
expected.");
+        final Optional<String> optionalKeyFormat = 
options.getOptional(KEY_FORMAT);
+        final Optional<List<String>> optionalKeyFields = 
options.getOptional(KEY_FIELDS);
+
+        if (!optionalKeyFormat.isPresent()) {
+            return new int[0];
+        }
+
+        final List<String> keyFields = optionalKeyFields.get();
+        final List<String> physicalFields = 
LogicalTypeChecks.getFieldNames(physicalType);
+        return keyFields.stream()
+                .mapToInt(
+                        keyField -> {
+                            final int pos = physicalFields.indexOf(keyField);
+                            // check that field name exists
+                            if (pos < 0) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Could not find the field '%s' 
in the table schema for usage in the key format. "
+                                                        + "A key field must be 
a regular, physical column. "
+                                                        + "The following 
columns can be selected in the '%s' option: %s",
+                                                keyField, KEY_FIELDS.key(), 
physicalFields));
+                            }
+                            // check that field name is prefixed correctly
+                            return pos;
+                        })
+                .toArray();
+    }
+
+    public static int[] createValueFormatProjection(
+            ReadableConfig options, DataType physicalDataType) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row data type 
expected.");
+
+        final int physicalFieldCount = 
LogicalTypeChecks.getFieldCount(physicalType);
+        final IntStream physicalFields = IntStream.range(0, 
physicalFieldCount);
+        final int[] keyProjection = createKeyFormatProjection(options, 
physicalDataType);
+        return physicalFields
+                .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == 
pos))
+                .toArray();
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Option Utils
+    // 
--------------------------------------------------------------------------------------------
+
+    public static List<String> getTopicListFromOptions(ReadableConfig 
tableOptions) {
+        return tableOptions.get(TOPIC);
+    }
+
+    public static Properties getPulsarProperties(ReadableConfig tableOptions) {
+        final Map<String, String> configs = ((Configuration) 
tableOptions).toMap();
+        return getPulsarProperties(configs);
+    }
+
+    public static Properties getPulsarProperties(Map<String, String> configs) {
+        return getPulsarPropertiesWithPrefix(configs, "pulsar");
+    }
+
+    public static Properties getPulsarPropertiesWithPrefix(
+            ReadableConfig tableOptions, String prefix) {
+        final Map<String, String> configs = ((Configuration) 
tableOptions).toMap();
+        return getPulsarPropertiesWithPrefix(configs, prefix);
+    }
+
+    public static Properties getPulsarPropertiesWithPrefix(
+            Map<String, String> configs, String prefix) {
+        final Properties pulsarProperties = new Properties();
+        configs.keySet().stream()
+                .filter(key -> key.startsWith(prefix))
+                .forEach(key -> pulsarProperties.put(key, configs.get(key)));
+        return pulsarProperties;
+    }
+
+    public static StartCursor getStartCursor(ReadableConfig tableOptions) {
+        if 
(tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()) {
+            return 
parseMessageIdStartCursor(tableOptions.get(SOURCE_START_FROM_MESSAGE_ID));
+        } else if 
(tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+            return 
parsePublishTimeStartCursor(tableOptions.get(SOURCE_START_FROM_PUBLISH_TIME));
+        } else {
+            return StartCursor.earliest();
+        }
+    }
+
+    public static StopCursor getStopCursor(ReadableConfig tableOptions) {
+        if (tableOptions.getOptional(SOURCE_STOP_AT_MESSAGE_ID).isPresent()) {
+            return 
parseAtMessageIdStopCursor(tableOptions.get(SOURCE_STOP_AT_MESSAGE_ID));
+        } else if 
(tableOptions.getOptional(SOURCE_STOP_AFTER_MESSAGE_ID).isPresent()) {
+            return 
parseAfterMessageIdStopCursor(tableOptions.get(SOURCE_STOP_AFTER_MESSAGE_ID));
+        } else if 
(tableOptions.getOptional(SOURCE_STOP_AT_PUBLISH_TIME).isPresent()) {
+            return 
parseAtPublishTimeStopCursor(tableOptions.get(SOURCE_STOP_AT_PUBLISH_TIME));
+        } else {
+            return StopCursor.never();
+        }
+    }
+
+    public static SubscriptionType getSubscriptionType(ReadableConfig 
tableOptions) {
+        return tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+    }
+
+    protected static StartCursor parseMessageIdStartCursor(String config) {
+        if (Objects.equals(config, "earliest")) {
+            return StartCursor.earliest();
+        } else if (Objects.equals(config, "latest")) {
+            return StartCursor.latest();
+        } else {
+            return StartCursor.fromMessageId(parseMessageIdString(config));
+        }
+    }
+
+    protected static StartCursor parsePublishTimeStartCursor(Long config) {
+        return StartCursor.fromPublishTime(config);
+    }
+
+    protected static StopCursor parseAtMessageIdStopCursor(String config) {
+        if (Objects.equals(config, "never")) {
+            return StopCursor.never();
+        } else if (Objects.equals(config, "latest")) {
+            return StopCursor.latest();
+        } else {
+            return StopCursor.atMessageId(parseMessageIdString(config));
+        }
+    }
+
+    protected static StopCursor parseAfterMessageIdStopCursor(String config) {
+        return StopCursor.afterMessageId(parseMessageIdString(config));
+    }
+
+    protected static StopCursor parseAtPublishTimeStopCursor(Long config) {
+        return StopCursor.atPublishTime(config);
+    }
+
+    protected static MessageIdImpl parseMessageIdString(String config) {
+        String[] tokens = config.split(":", 3);
+        checkArgument(tokens.length == 3, "MessageId format must be 
ledgerId:entryId:partitionId.");
+
+        try {
+            long ledgerId = Long.parseLong(tokens[0]);
+            long entryId = Long.parseLong(tokens[1]);
+            int partitionId = Integer.parseInt(tokens[2]);
+            return new MessageIdImpl(ledgerId, entryId, partitionId);
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException(
+                    "MessageId format must be ledgerId:entryId:partitionId. "
+                            + "Each id should be able to parsed to long 
type.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Sink Option Utils
+    // 
--------------------------------------------------------------------------------------------
+
+    public static TopicRouter<RowData> getTopicRouter(
+            ReadableConfig readableConfig, ClassLoader classLoader) {
+        if (!readableConfig.getOptional(SINK_CUSTOM_TOPIC_ROUTER).isPresent()) 
{
+            return null;
+        }
+
+        String className = readableConfig.get(SINK_CUSTOM_TOPIC_ROUTER);
+        try {
+            Class<?> clazz = Class.forName(className, true, classLoader);
+            if (!TopicRouter.class.isAssignableFrom(clazz)) {
+                throw new ValidationException(
+                        String.format(
+                                "Sink TopicRouter class '%s' should extend 
from the required class %s",
+                                className, TopicRouter.class.getName()));
+            }
+            @SuppressWarnings("unchecked")
+            final TopicRouter<RowData> topicRouter =
+                    InstantiationUtil.instantiate(className, 
TopicRouter.class, classLoader);
+
+            return topicRouter;
+        } catch (ClassNotFoundException | FlinkException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find and instantiate TopicRouter class 
'%s'", className),
+                    e);
+        }
+    }
+
+    public static TopicRoutingMode getTopicRoutingMode(ReadableConfig 
readableConfig) {
+        return readableConfig.get(SINK_TOPIC_ROUTING_MODE);
+    }
+
+    public static long getMessageDelayMillis(ReadableConfig readableConfig) {
+        return readableConfig.get(SINK_MESSAGE_DELAY_INTERVAL).toMillis();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
new file mode 100644
index 0000000000..ae186dc739
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {
+    }
+
+    public static final ConfigOption<List<String>> TOPIC =
+            ConfigOptions.key("topic")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s. "
+                                                    + "When a list of topics 
configured, please ensure that all the topics are in the same schema as Flink 
Table need a fixed schema.",
+                                            code(";"), code("topic-1;topic-2"))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<SubscriptionType> 
SOURCE_SUBSCRIPTION_TYPE =
+            ConfigOptions.key("source.subscription-type")
+                    .enumType(SubscriptionType.class)
+                    .defaultValue(SubscriptionType.Exclusive)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The [subscription 
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
 that is supported by the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 Currently, only %s and %s subscription types are supported.",
+                                            code("Exclusive"), code("Shared"))
+                                    .build());
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+     * Copied because we want to have a default value for it.
+     */
+    public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("scan.startup.sub-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("scan.startup.sub-start-offset")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) Message id that is 
used to specify a consuming starting "
+                                                    + "point for source. Use 
%s, %s or pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s. This option 
takes precedence over "
+                                                    + 
"source.start.publish-time.",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+    public static final ConfigOption<Long> SOURCE_START_FROM_PUBLISH_TIME =
+            ConfigOptions.key("source.start.publish-time")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) Publish timestamp that 
is used to specify a starting point for the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source)
 to consume data. "
+                                                    + "Option 
source.start.message-id takes precedence over this one.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_STOP_AT_MESSAGE_ID =
+            ConfigOptions.key("source.stop.at-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop cursor 
for the unbounded sql "
+                                    + "source. Use \"never\", \"latest\" or 
pass in a message id "
+                                    + "representation in 
\"ledgerId:entryId:partitionId\", "
+                                    + "such as \"12:2:-1\"");
+
+    public static final ConfigOption<String> SOURCE_STOP_AFTER_MESSAGE_ID =
+            ConfigOptions.key("source.stop.after-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop 
position but include the "
+                                    + "given message in the consuming result 
for the unbounded sql "
+                                    + "source. Pass in a message id "
+                                    + "representation in 
\"ledgerId:entryId:partitionId\", "
+                                    + "such as \"12:2:-1\". ");
+
+    public static final ConfigOption<Long> SOURCE_STOP_AT_PUBLISH_TIME =
+            ConfigOptions.key("source.stop.at-publish-time")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional publish timestamp used to specify a stop 
cursor"
+                                    + " for the unbounded sql source.");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Sink Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> SINK_CUSTOM_TOPIC_ROUTER =
+            ConfigOptions.key("sink.custom-topic-router")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) the custom topic 
router class URL that is used in the [Pulsar DataStream sink 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).
 If this option is provided, the %s option will be ignored.",
+                                            code("sink.topic-routing-mode"))
+                                    .build());
+
+    public static final ConfigOption<TopicRoutingMode> SINK_TOPIC_ROUTING_MODE 
=
+            ConfigOptions.key("sink.topic-routing-mode")
+                    .enumType(TopicRoutingMode.class)
+                    .defaultValue(TopicRoutingMode.ROUND_ROBIN)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) the topic routing 
mode. Available options are %s and %s. By default, it is set to %s. If you want 
to use a custom topic router, use the %s option to determine the partition for 
a particular message.",
+                                            code("round-robin"),
+                                            code("message-key-hash"),
+                                            code("round-robin"),
+                                            code("sink.custom-topic-router"))
+                                    .build());
+
+    public static final ConfigOption<Duration> SINK_MESSAGE_DELAY_INTERVAL =
+            ConfigOptions.key("sink.message-delay-interval")
+                    .durationType()
+                    .defaultValue(Duration.ZERO)
+                    .withDescription(
+                            "(Optional) the message delay delivery interval 
that is used in the [Pulsar DataStream sink 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-sink).");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<String> KEY_FORMAT =
+            ConfigOptions.key("key" + FORMAT_SUFFIX)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for 
decoding/encoding key bytes in "
+                                    + "Pulsar message. The identifier is used 
to discover a suitable format factory.");
+
+    public static final ConfigOption<List<String>> KEY_FIELDS =
+            ConfigOptions.key("key.fields")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            "An explicit list of physical columns from the 
table schema that are decoded/encoded from the key bytes of a Pulsar message. 
By default, this list is empty and thus a key is undefined.");
+
+    public static final ConfigOption<String> VALUE_FORMAT =
+            ConfigOptions.key("value" + FORMAT_SUFFIX)
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for 
decoding/encoding value data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Pulsar Options
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.common.config.PulsarOptions#PULSAR_SERVICE_URL}.
 Copied
+     * here because it is a required config option and should not be included 
in the {@link
+     * 
org.apache.flink.table.factories.FactoryUtil.FactoryHelper#validateExcept(String...)}
 method.
+     *
+     * <p>By default all {@link 
org.apache.flink.connector.pulsar.common.config.PulsarOptions} are
+     * included in the validateExcept() method./p>
+     */
+    public static final ConfigOption<String> SERVICE_URL =
+            ConfigOptions.key("service-url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text("Service URL provider for Pulsar 
service.")
+                                    .linebreak()
+                                    .text(
+                                            "To connect to Pulsar using client 
libraries, you need to specify a Pulsar protocol URL.")
+                                    .linebreak()
+                                    .text(
+                                            "You can assign Pulsar protocol 
URLs to specific clusters and use the Pulsar scheme.")
+                                    .linebreak()
+                                    .list(
+                                            text(
+                                                    "This is an example of %s: 
%s.",
+                                                    code("localhost"),
+                                                    
code("pulsar://localhost:6650")),
+                                            text(
+                                                    "If you have multiple 
brokers, the URL is as: %s",
+                                                    code(
+                                                            
"pulsar://localhost:6550,localhost:6651,localhost:6652")),
+                                            text(
+                                                    "A URL for a production 
Pulsar cluster is as: %s",
+                                                    code(
+                                                            
"pulsar://pulsar.us-west.example.com:6650")),
+                                            text(
+                                                    "If you use TLS 
authentication, the URL is as %s",
+                                                    code(
+                                                            
"pulsar+ssl://pulsar.us-west.example.com:6651")))
+                                    .build());
+
+    public static final ConfigOption<Boolean> EXPLICIT =
+            ConfigOptions.key("explicit")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Indicate if the table is an explicit 
Flink table.");
+
+    public static final ConfigOption<String> ADMIN_URL =
+            ConfigOptions.key("admin-url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The Pulsar service HTTP URL for 
the admin endpoint. For example, %s, or %s for TLS.",
+                                            
code("http://my-broker.example.com:8080";),
+                                            
code("https://my-broker.example.com:8443";))
+                                    .build());
+
+    public static final ConfigOption<String> STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) Message id that is 
used to specify a consuming starting "
+                                                    + "point for source. Use 
%s, %s or pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s. This option 
takes precedence over "
+                                                    + 
"source.start.publish-time.",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
new file mode 100644
index 0000000000..8dac83871a
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.RowKind;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static 
org.apache.inlong.sort.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static org.apache.inlong.sort.pulsar.table.PulsarTableOptions.TOPIC;
+import static org.apache.pulsar.common.naming.TopicName.isValid;
+
+/** Util class for source and sink validation rules. */
+public class PulsarTableValidationUtils {
+
+    private PulsarTableValidationUtils() {
+    }
+
+    public static void validatePrimaryKeyConstraints(
+            ObjectIdentifier tableName,
+            int[] primaryKeyIndexes,
+            FactoryUtil.TableFactoryHelper helper) {
+        final DecodingFormat<DeserializationSchema<RowData>> format =
+                getValueDecodingFormat(helper);
+        if (primaryKeyIndexes.length > 0
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    String.format(
+                            "The Pulsar table '%s' with '%s' format doesn't 
support defining PRIMARY KEY constraint"
+                                    + " on the table, because it can't 
guarantee the semantic of primary key.",
+                            tableName.asSummaryString(), format));
+        }
+    }
+
+    public static void validateTableSourceOptions(ReadableConfig tableOptions) 
{
+        validateTopicsConfigs(tableOptions);
+        validateStartCursorConfigs(tableOptions);
+        validateStopCursorConfigs(tableOptions);
+        validateSubscriptionTypeConfigs(tableOptions);
+        validateKeyFormatConfigs(tableOptions);
+    }
+
+    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+        validateTopicsConfigs(tableOptions);
+        validateKeyFormatConfigs(tableOptions);
+        validateSinkRoutingConfigs(tableOptions);
+    }
+
+    protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
+        if (tableOptions.get(TOPIC).isEmpty()) {
+            throw new ValidationException("The topics list should not be 
empty.");
+        }
+
+        for (String topic : tableOptions.get(TOPIC)) {
+            if (!isValid(topic)) {
+                throw new ValidationException(
+                        String.format("The topics name %s is not a valid topic 
name.", topic));
+            }
+        }
+    }
+
+    protected static void validateStartCursorConfigs(ReadableConfig 
tableOptions) {
+        if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+                && 
tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s and %s can be specified. Detected 
both of them",
+                            SOURCE_START_FROM_MESSAGE_ID, 
SOURCE_START_FROM_PUBLISH_TIME));
+        }
+    }
+
+    protected static void validateStopCursorConfigs(ReadableConfig 
tableOptions) {
+        Set<ConfigOption<?>> conflictConfigOptions =
+                Sets.newHashSet(
+                        SOURCE_STOP_AT_MESSAGE_ID,
+                        SOURCE_STOP_AFTER_MESSAGE_ID,
+                        SOURCE_STOP_AT_PUBLISH_TIME);
+
+        long configsNums =
+                conflictConfigOptions.stream()
+                        .map(tableOptions::getOptional)
+                        .filter(Optional::isPresent)
+                        .count();
+
+        if (configsNums > 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s, %s and %s can be specified. 
Detected more than 1 of them",
+                            SOURCE_STOP_AT_MESSAGE_ID,
+                            SOURCE_STOP_AFTER_MESSAGE_ID,
+                            SOURCE_STOP_AT_PUBLISH_TIME));
+        }
+    }
+
+    protected static void validateSubscriptionTypeConfigs(ReadableConfig 
tableOptions) {
+        SubscriptionType subscriptionType = 
tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+
+        if (subscriptionType == SubscriptionType.Failover) {
+            throw new ValidationException(
+                    String.format(
+                            "%s SubscriptionType is not supported. ", 
SubscriptionType.Failover));
+        }
+    }
+
+    protected static void validateKeyFormatConfigs(ReadableConfig 
tableOptions) {
+        final Optional<String> optionalKeyFormat = 
tableOptions.getOptional(KEY_FORMAT);
+        final Optional<List<String>> optionalKeyFields = 
tableOptions.getOptional(KEY_FIELDS);
+        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "The option '%s' can only be declared if a key 
format is defined using '%s'.",
+                            KEY_FIELDS.key(), KEY_FORMAT.key()));
+        } else if (optionalKeyFormat.isPresent()
+                && (!optionalKeyFields.isPresent() || 
optionalKeyFields.get().size() == 0)) {
+            throw new ValidationException(
+                    String.format(
+                            "A key format '%s' requires the declaration of one 
or more of key fields using '%s'.",
+                            KEY_FORMAT.key(), KEY_FIELDS.key()));
+        }
+    }
+
+    protected static void validateSinkRoutingConfigs(ReadableConfig 
tableOptions) {
+        if (tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE)
+                .orElse(TopicRoutingMode.ROUND_ROBIN) == 
TopicRoutingMode.CUSTOM) {
+            throw new ValidationException(
+                    String.format(
+                            "Only  %s and %s can be used. For %s, please use 
sink.custom-topic-router for"
+                                    + "custom topic router and do not set this 
config.",
+                            TopicRoutingMode.ROUND_ROBIN,
+                            TopicRoutingMode.MESSAGE_KEY_HASH,
+                            TopicRoutingMode.CUSTOM));
+        }
+        if (tableOptions.getOptional(SINK_CUSTOM_TOPIC_ROUTER).isPresent()
+                && 
tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE).isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s and %s can be specified. Detected 
both of them",
+                            SINK_CUSTOM_TOPIC_ROUTER, 
SINK_TOPIC_ROUTING_MODE));
+        }
+    }
+
+    protected static void validateUpsertModeKeyConstraints(
+            ReadableConfig tableOptions, int[] primaryKeyIndexes) {
+        if (!tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+            throw new ValidationException(
+                    "Upsert mode requires key.fields set to the primary key 
fields, should be set");
+        }
+
+        if (tableOptions.getOptional(KEY_FIELDS).get().size() == 0
+                || primaryKeyIndexes.length == 0) {
+            throw new ValidationException(
+                    "'upsert-pulsar' require to define a PRIMARY KEY 
constraint. "
+                            + "The PRIMARY KEY specifies which columns should 
be read from or write to the Pulsar message key. "
+                            + "The PRIMARY KEY also defines records in the 
'upsert-pulsar' table should update or delete on which keys.");
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..ba5da26e94
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.pulsar.table.PulsarTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index 44dce64c4d..bd944b384c 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -831,6 +831,13 @@
   Source  : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that 
the software have been modified.)
   License : https://github.com/jeff-zou/flink-connector-redis/blob/main/LICENSE
 
+1.3.22 
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptionUtils.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableOptions.java
+       
inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableValidationUtils.java
+  Source  : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note 
that the software have been modified.)
+  License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
+
 =======================================================================
 Apache InLong Subcomponents:
 

Reply via email to