This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit cc508f5c77516e3c3420475ee6c451602d9fe052
Author: thexia <[email protected]>
AuthorDate: Wed May 18 10:38:34 2022 +0800

    [INLONG-4141][Sort] Sort lightwieght support load data from Pulsar (#4231)
---
 .../org/apache/inlong/sort/protocol/node/Node.java |   2 +
 .../protocol/node/extract/PulsarExtractNode.java   | 123 +++++++
 .../node/extract/PulsarExtractNodeTest.java        |  55 +++
 inlong-sort/sort-connectors/pom.xml                |  12 +
 .../table/DynamicPulsarSerializationSchema.java    | 253 ++++++++++++++
 .../pulsar/table/PulsarDynamicTableFactory.java    | 370 ++++++++++++++++++++
 .../flink/pulsar/table/PulsarDynamicTableSink.java | 382 +++++++++++++++++++++
 .../table/UpsertPulsarDynamicTableFactory.java     | 364 ++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../flink/parser/PulsarSqlParserTest.java          | 108 ++++++
 .../org.apache.flink.table.factories.Factory       |   4 +-
 licenses/inlong-sort/LICENSE                       |   7 +
 pom.xml                                            |   7 +
 13 files changed, 1688 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 56f53bce6..7e3eafb2e 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
@@ -50,6 +51,7 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = KafkaExtractNode.class, name = 
"kafkaExtract"),
         @JsonSubTypes.Type(value = PostgresExtractNode.class, name = 
"postgresExtract"),
         @JsonSubTypes.Type(value = FileSystemExtractNode.class, name = 
"fileSystemExtract"),
+        @JsonSubTypes.Type(value = PulsarExtractNode.class, name = 
"pulsarExtract"),
         @JsonSubTypes.Type(value = TransformNode.class, name = 
"baseTransform"),
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
new file mode 100644
index 000000000..270cd0f66
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -0,0 +1,123 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("pulsarExtract")
+@Data
+public class PulsarExtractNode extends ExtractNode {
+    private static final long serialVersionUID = 1L;
+
+    @Nonnull
+    @JsonProperty("topic")
+    private String topic;
+    @Nonnull
+    @JsonProperty("adminUrl")
+    private String adminUrl;
+    @Nonnull
+    @JsonProperty("serviceUrl")
+    private String serviceUrl;
+    @Nonnull
+    @JsonProperty("format")
+    private Format format;
+
+    @JsonProperty("scanStartupMode")
+    private String scanStartupMode;
+
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonCreator
+    public PulsarExtractNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @Nullable @JsonProperty("watermarkField") WatermarkField 
watermarkField,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("topic") String topic,
+            @Nonnull @JsonProperty("adminUrl") String adminUrl,
+            @Nonnull @JsonProperty("serviceUrl") String serviceUrl,
+            @Nonnull @JsonProperty("format") Format format,
+            @Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
+            @JsonProperty("primaryKey") String primaryKey) {
+        super(id, name, fields, watermarkField, properties);
+        this.topic = Preconditions.checkNotNull(topic, "pulsar topic is 
null.");
+        this.adminUrl = Preconditions.checkNotNull(adminUrl, "pulsar adminUrl 
is null.");
+        this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar 
serviceUrl is null.");
+        this.format = Preconditions.checkNotNull(format, "pulsar format is 
null.");
+        this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode,
+                "pulsar scanStartupMode is null.");
+        this.primaryKey = primaryKey;
+    }
+
+    /**
+     * generate table options
+     *
+     * @return options
+     */
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        if (StringUtils.isEmpty(this.primaryKey)) {
+            options.put("connector", "pulsar-inlong");
+            options.putAll(format.generateOptions(false));
+        } else {
+            options.put("connector", "upsert-pulsar-inlong");
+            options.putAll(format.generateOptions(true));
+        }
+        options.put("generic", "true");
+        options.put("service-url", serviceUrl);
+        options.put("admin-url", adminUrl);
+        options.put("topic", topic);
+        options.put("scan.startup.mode", scanStartupMode);
+
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return String.format("table_%s", super.getId());
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return primaryKey;
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return super.getPartitionFields();
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
new file mode 100644
index 000000000..0ab2fc81b
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNodeTest.java
@@ -0,0 +1,55 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.extract;
+
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link PulsarExtractNode}
+ */
+public class PulsarExtractNodeTest extends SerializeBaseTest<Node> {
+
+    @Override
+    public Node getTestObject() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        Format format = new CsvFormat();
+        return new PulsarExtractNode("2",
+                "pulsar_input",
+                fields,
+                null,
+                null,
+                "persistent://public/default/test_stream",
+                "http://localhost:8080";,
+                "pulsar://localhost:6650",
+                format,
+                "earliest",
+                null);
+    }
+}
diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index eeb4a41d3..a94e0e310 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>flink-table-common</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-runtime-blink_${flink.scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-parquet_${flink.scala.binary.version}</artifactId>
@@ -139,6 +144,12 @@
             <artifactId>hadoop-minicluster</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.streamnative.connectors</groupId>
+            
<artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>
+        </dependency>
+
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
@@ -156,6 +167,7 @@
             </exclusions>
         </dependency>
 
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java
new file mode 100644
index 000000000..4c2037701
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java
@@ -0,0 +1,253 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.protobuf.PbFormatOptions;
+import 
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
+import org.apache.flink.streaming.util.serialization.FlinkSchema;
+import org.apache.flink.streaming.util.serialization.PulsarContextAware;
+import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * A specific Serializer for {@link PulsarDynamicTableSink}.
+ */
+class DynamicPulsarSerializationSchema
+        implements PulsarSerializationSchema<RowData>, 
PulsarContextAware<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final @Nullable
+    SerializationSchema<RowData> keySerialization;
+
+    private final SerializationSchema<RowData> valueSerialization;
+
+    private final RowData.FieldGetter[] keyFieldGetters;
+
+    private final RowData.FieldGetter[] valueFieldGetters;
+
+    private final boolean hasMetadata;
+
+    private final boolean upsertMode;
+
+    /**
+     * Contains the position for each value of {@link 
PulsarDynamicTableSink.WritableMetadata} in the consumed row or
+     * -1 if this metadata key is not used.
+     */
+    private final int[] metadataPositions;
+
+    private int[] partitions;
+
+    private int parallelInstanceId;
+
+    private int numParallelInstances;
+
+    private DataType valueDataType;
+
+    private String valueFormatType;
+
+    private volatile Schema<RowData> schema;
+
+    /**
+     * delay milliseconds message.
+     */
+    private long delayMilliseconds;
+
+    DynamicPulsarSerializationSchema(
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode,
+            DataType valueDataType,
+            String valueFormatType,
+            long delayMilliseconds) {
+        if (upsertMode) {
+            checkArgument(keySerialization != null && keyFieldGetters.length > 
0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+        this.valueDataType = valueDataType;
+        this.valueFormatType = valueFormatType;
+        this.delayMilliseconds = delayMilliseconds;
+    }
+
+    @Override
+    public void open(SerializationSchema.InitializationContext context) throws 
Exception {
+        if (keySerialization != null) {
+            keySerialization.open(context);
+        }
+        valueSerialization.open(context);
+    }
+
+    @Override
+    public void serialize(RowData consumedRow, TypedMessageBuilder<RowData> 
messageBuilder) {
+
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {
+            messageBuilder.value(consumedRow);
+            return;
+        }
+
+        // set delay message.
+        if (delayMilliseconds > 0) {
+            messageBuilder.deliverAfter(delayMilliseconds, 
TimeUnit.MILLISECONDS);
+        }
+
+        if (keySerialization != null) {
+            final RowData keyRow = createProjectedRow(consumedRow, 
RowKind.INSERT, keyFieldGetters);
+            messageBuilder.keyBytes(keySerialization.serialize(keyRow));
+        }
+
+        final RowKind kind = consumedRow.getRowKind();
+        final RowData valueRow = createProjectedRow(consumedRow, kind, 
valueFieldGetters);
+        if (upsertMode) {
+            if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
+                // transform the message as the tombstone message
+            } else {
+                // make the message to be INSERT to be compliant with the 
INSERT-ONLY format
+                valueRow.setRowKind(RowKind.INSERT);
+                messageBuilder.value(valueRow);
+            }
+        } else {
+            messageBuilder.value(valueRow);
+        }
+
+        Map<String, String> properties = readMetadata(consumedRow, 
PulsarDynamicTableSink.WritableMetadata.PROPERTIES);
+        if (properties != null) {
+            messageBuilder.properties(properties);
+        }
+        final Long eventTime = readMetadata(consumedRow, 
PulsarDynamicTableSink.WritableMetadata.EVENT_TIME);
+        if (eventTime != null && eventTime >= 0) {
+            messageBuilder.eventTime(eventTime);
+        }
+    }
+
+    public Optional<String> getTargetTopic(RowData element) {
+        //TODO need get topic from row.
+        return Optional.empty();
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T readMetadata(RowData consumedRow, 
PulsarDynamicTableSink.WritableMetadata metadata) {
+        final int pos = metadataPositions[metadata.ordinal()];
+        if (pos < 0) {
+            return null;
+        }
+        return (T) metadata.converter.read(consumedRow, pos);
+    }
+
+    private static RowData createProjectedRow(RowData consumedRow, RowKind 
kind, RowData.FieldGetter[] fieldGetters) {
+        final int arity = fieldGetters.length;
+        final GenericRowData genericRowData = new GenericRowData(kind, arity);
+        for (int fieldPos = 0; fieldPos < arity; fieldPos++) {
+            genericRowData.setField(fieldPos, 
fieldGetters[fieldPos].getFieldOrNull(consumedRow));
+        }
+        return genericRowData;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        final RowType rowType = (RowType) valueDataType.getLogicalType();
+        return InternalTypeInfo.of(rowType);
+    }
+
+    @Override
+    public void setParallelInstanceId(int parallelInstanceId) {
+        this.parallelInstanceId = parallelInstanceId;
+    }
+
+    @Override
+    public void setNumParallelInstances(int numParallelInstances) {
+        this.numParallelInstances = numParallelInstances;
+    }
+
+    @Override
+    public Schema<RowData> getSchema() {
+        if (schema == null) {
+            synchronized (this) {
+                if (schema == null) {
+                    schema = buildSchema();
+                }
+            }
+        }
+        return schema;
+    }
+
+    private FlinkSchema<RowData> buildSchema() {
+        if (StringUtils.isBlank(valueFormatType)) {
+            return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), 
valueSerialization, null);
+        }
+        Configuration configuration = new Configuration();
+        hackPbSerializationSchema(configuration);
+        SchemaInfo schemaInfo = 
SchemaUtils.tableSchemaToSchemaInfo(valueFormatType, valueDataType, 
configuration);
+        return new FlinkSchema<>(schemaInfo, valueSerialization, null);
+    }
+
+    private void hackPbSerializationSchema(Configuration configuration) {
+        // reflect read PbRowSerializationSchema#messageClassName
+        if (valueSerialization instanceof PbRowDataSerializationSchema) {
+            try {
+                final String messageClassName =
+                        (String) 
FieldUtils.readDeclaredField(valueSerialization, "messageClassName", true);
+                configuration.set(PbFormatOptions.MESSAGE_CLASS_NAME, 
messageClassName);
+            } catch (IllegalAccessException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+        Object read(RowData consumedRow, int pos);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java
new file mode 100644
index 000000000..c6be8c5e1
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java
@@ -0,0 +1,370 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.pulsar.common.naming.TopicName;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.GENERIC;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PARTITION_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES_PREFIX;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_NAME;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SCAN_STARTUP_SUB_START_OFFSET;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_MESSAGE_ROUTER;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SINK_SEMANTIC;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC_PATTERN;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createKeyFormatProjection;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getMessageRouter;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateSinkMessageRouter;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+
+/**
+ * Copy from 
io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
+ *
+ * Factory for creating configured instances of
+ * {@link 
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableFactory}.
+ * We modify PulsarDynamicTableFactory validate logic to support nested format.
+ */
+public class PulsarDynamicTableFactory implements
+        DynamicTableSourceFactory,
+        DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "pulsar-inlong";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        List<String> topics = generateTopic(context.getObjectIdentifier(), 
tableOptions);
+        if (topics != null && !topics.isEmpty()) {
+            ((Configuration) tableOptions).set(TOPIC, 
Collections.singletonList(topics.get(0)));
+        }
+
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> 
keyEncodingFormat =
+                getKeyEncodingFormat(helper);
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                getValueEncodingFormat(helper);
+
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX, "type", 
"table-default-partitions", "default-database");
+        // Validate the option values.
+        PulsarTableOptions.validateTableSinkOptions(tableOptions);
+
+        Properties properties = 
getPulsarProperties(context.getCatalogTable().toProperties());
+        validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueEncodingFormat);
+
+        final DataType physicalDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        String adminUrl = tableOptions.get(ADMIN_URL);
+        String serverUrl = tableOptions.get(SERVICE_URL);
+        return createPulsarTableSink(tableOptions, topics, adminUrl, 
serverUrl, keyEncodingFormat, valueEncodingFormat,
+                properties, physicalDataType, keyProjection, valueProjection, 
keyPrefix, context);
+    }
+
+    private PulsarDynamicTableSink createPulsarTableSink(ReadableConfig 
tableOptions, List<String> topics,
+            String adminUrl, String serverUrl,
+            Optional<EncodingFormat<SerializationSchema<RowData>>> 
keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            Properties properties, DataType physicalDataType,
+            int[] keyProjection, int[] valueProjection,
+            String keyPrefix, Context context) {
+
+        final String formatType = 
tableOptions.getOptional(FORMAT).orElseGet(() -> 
tableOptions.get(VALUE_FORMAT));
+        final Integer parallelism = 
tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+        return new PulsarDynamicTableSink(
+                serverUrl,
+                adminUrl,
+                topics.get(0),
+                physicalDataType,
+                properties,
+                keyEncodingFormat.orElse(null),
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                PulsarTableOptions.getSinkSemantic(tableOptions),
+                formatType,
+                false,
+                parallelism,
+                getMessageRouter(tableOptions, 
context.getClassLoader()).orElse(null));
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        List<String> topics = generateTopic(context.getObjectIdentifier(), 
tableOptions);
+        if (topics != null && !topics.isEmpty()) {
+            ((Configuration) tableOptions).set(TOPIC, 
Collections.singletonList(topics.get(0)));
+        }
+
+        // Generic Flink Table can reference multiple topics with topicPattern
+        String topicPattern = 
tableOptions.getOptional(TOPIC_PATTERN).orElse(null);
+        if (topicPattern != null && !topicPattern.isEmpty()) {
+            ((Configuration) tableOptions).set(TOPIC_PATTERN, topicPattern);
+        }
+
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                getValueDecodingFormat(helper);
+        final String valueFormatPrefix = tableOptions.getOptional(FORMAT)
+                .orElse(tableOptions.get(VALUE_FORMAT));
+        // Validate the option data type.
+        helper.validateExcept(PROPERTIES_PREFIX,
+                "type", "table-default-partitions", "default-database", 
valueFormatPrefix);
+        // Validate the option values.
+        validateTableSourceOptions(tableOptions);
+        validateSinkMessageRouter(tableOptions);
+        validatePKConstraints(context.getObjectIdentifier(), 
context.getCatalogTable(), valueDecodingFormat);
+
+        Properties properties = 
getPulsarProperties(context.getCatalogTable().toProperties());
+
+        final PulsarTableOptions.StartupOptions startupOptions = 
PulsarTableOptions
+                .getStartupOptions(tableOptions);
+
+        final DataType physicalDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        String adminUrl = tableOptions.get(ADMIN_URL);
+        String serviceUrl = tableOptions.get(SERVICE_URL);
+        return createPulsarTableSource(
+                physicalDataType,
+                keyDecodingFormat.orElse(null),
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                serviceUrl,
+                adminUrl,
+                properties,
+                startupOptions);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(SERVICE_URL);
+        options.add(ADMIN_URL);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FactoryUtil.FORMAT);
+        options.add(KEY_FORMAT);
+        options.add(KEY_FIELDS);
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FORMAT);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(TOPIC);
+        options.add(TOPIC_PATTERN);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+        options.add(SCAN_STARTUP_SUB_NAME);
+        options.add(SCAN_STARTUP_SUB_START_OFFSET);
+        options.add(GENERIC);
+
+        options.add(PARTITION_DISCOVERY_INTERVAL_MILLIS);
+        options.add(SINK_SEMANTIC);
+        options.add(SINK_MESSAGE_ROUTER);
+        options.add(SINK_PARALLELISM);
+        options.add(PROPERTIES);
+        return options;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private List<String> generateTopic(ObjectIdentifier table, ReadableConfig 
tableOptions) {
+        List<String> topics = null;
+        if (tableOptions.get(GENERIC)) {
+            topics = tableOptions.getOptional(TOPIC).orElse(null);
+        } else {
+            String rawTopic = table.getDatabaseName() + "/" + 
table.getObjectName();
+            final String topic = TopicName.get(rawTopic).toString();
+            topics = Collections.singletonList(topic);
+        }
+
+        return topics;
+    }
+
+    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class,
+                        KEY_FORMAT);
+        keyDecodingFormat.ifPresent(format -> {
+            if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                throw new ValidationException(
+                        String.format(
+                                "A key format should only deal with 
INSERT-only records. "
+                                        + "But %s has a changelog mode of %s.",
+                                helper.getOptions().get(KEY_FORMAT),
+                                format.getChangelogMode()));
+            }
+        });
+        return keyDecodingFormat;
+    }
+
+    private static Optional<EncodingFormat<SerializationSchema<RowData>>> 
getKeyEncodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        final Optional<EncodingFormat<SerializationSchema<RowData>>> 
keyEncodingFormat =
+                helper.discoverOptionalEncodingFormat(
+                        SerializationFormatFactory.class,
+                        KEY_FORMAT);
+        keyEncodingFormat.ifPresent(format -> {
+            if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                throw new ValidationException(
+                        String.format(
+                                "A key format should only deal with 
INSERT-only records. "
+                                        + "But %s has a changelog mode of %s.",
+                                helper.getOptions().get(KEY_FORMAT),
+                                format.getChangelogMode()));
+            }
+        });
+        return keyEncodingFormat;
+    }
+
+    private static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return 
helper.discoverOptionalDecodingFormat(DeserializationFormatFactory.class, 
FactoryUtil.FORMAT)
+                .orElseGet(() -> 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    private static EncodingFormat<SerializationSchema<RowData>> 
getValueEncodingFormat(
+            FactoryUtil.TableFactoryHelper helper) {
+        return 
helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, 
FactoryUtil.FORMAT)
+                .orElseGet(() -> 
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT));
+    }
+
+    private static void validatePKConstraints(ObjectIdentifier tableName, 
CatalogTable catalogTable, Format format) {
+        if (catalogTable.getSchema().getPrimaryKey().isPresent()
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            Configuration options = 
Configuration.fromMap(catalogTable.getOptions());
+            String formatName = 
options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
+            throw new ValidationException(String.format(
+                    "The Pulsar table '%s' with '%s' format doesn't support 
defining PRIMARY KEY constraint"
+                            + " on the table, because it can't guarantee the 
semantic of primary key.",
+                    tableName.asSummaryString(),
+                    formatName
+            ));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    protected PulsarDynamicTableSource createPulsarTableSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> topics,
+            @Nullable String topicPattern,
+            String serviceUrl,
+            String adminUrl,
+            Properties properties,
+            PulsarTableOptions.StartupOptions startupOptions) {
+        return new PulsarDynamicTableSource(
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                serviceUrl,
+                adminUrl,
+                properties,
+                startupOptions,
+                false);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java
new file mode 100644
index 000000000..4eb4ed47d
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java
@@ -0,0 +1,382 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
+import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+/**
+ * pulsar dynamic table sink.
+ */
+public class PulsarDynamicTableSink implements DynamicTableSink, 
SupportsWritingMetadata {
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Metadata that is appended at the end of a physical sink row. */
+    protected List<String> metadataKeys;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+    /**
+     * The pulsar topic to write to.
+     */
+    protected final String topic;
+    protected final String serviceUrl;
+    protected final String adminUrl;
+
+    /**
+     * Properties for the pulsar producer.
+     */
+    protected final Properties properties;
+
+    /** Optional format for encoding keys to Pulsar. */
+    protected final @Nullable
+    EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat;
+    /**
+     * Sink format for encoding records to pulsar.
+     */
+    protected final EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat;
+
+    /** Indices that determine the key fields and the source position in the 
consumed row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the source position in the 
consumed row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the 
physical data type. */
+    protected final @Nullable
+    String keyPrefix;
+
+    /**
+     * Flag to determine sink mode. In upsert mode sink transforms the 
delete/update-before message to
+     * tombstone message.
+     */
+    protected final boolean upsertMode;
+
+    /** Parallelism of the physical Pulsar producer. **/
+    protected final @Nullable Integer parallelism;
+
+    /** Sink commit semantic. */
+    protected final PulsarSinkSemantic semantic;
+
+    private final String formatType;
+
+    private final MessageRouter messageRouter;
+
+    protected PulsarDynamicTableSink(
+            String serviceUrl,
+            String adminUrl,
+            String topic,
+            DataType physicalDataType,
+            Properties properties,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            PulsarSinkSemantic semantic,
+            String formatType,
+            boolean upsertMode,
+            @Nullable  Integer parallelism,
+            @Nullable MessageRouter messageRouter) {
+        this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "serviceUrl 
data type must not be null.");
+        this.adminUrl = Preconditions.checkNotNull(adminUrl, "adminUrl data 
type must not be null.");
+        this.topic = Preconditions.checkNotNull(topic, "Topic must not be 
null.");
+        this.physicalDataType = Preconditions.checkNotNull(physicalDataType, 
"Consumed data type must not be null.");
+        // Mutable attributes
+        this.metadataKeys = Collections.emptyList();
+        this.properties = Preconditions.checkNotNull(properties, "Properties 
must not be null.");
+        this.keyEncodingFormat = keyEncodingFormat;
+        this.valueEncodingFormat = 
Preconditions.checkNotNull(valueEncodingFormat, "Encoding format must not be 
null.");
+        this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key 
projection must not be null.");
+        this.valueProjection = Preconditions.checkNotNull(valueProjection, 
"Value projection must not be null.");
+        this.keyPrefix = keyPrefix;
+        this.semantic = Preconditions.checkNotNull(semantic, "Semantic must 
not be null.");
+        this.formatType = Preconditions.checkNotNull(formatType, "FormatType 
must not be null.");
+        this.upsertMode = upsertMode;
+        this.parallelism = parallelism;
+        this.messageRouter = messageRouter;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return this.valueEncodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final SerializationSchema<RowData> keySerialization =
+                createSerialization(context, keyEncodingFormat, keyProjection, 
keyPrefix);
+
+        final SerializationSchema<RowData> valueSerialization =
+                createSerialization(context, valueEncodingFormat, 
valueProjection, null);
+
+        final PulsarSerializationSchema<RowData> pulsarSerializer =
+                createPulsarSerializer(keySerialization, valueSerialization);
+
+        final SinkFunction<RowData> pulsarSink = createPulsarSink(
+                this.topic,
+                this.properties,
+                pulsarSerializer);
+
+        return SinkFunctionProvider.of(pulsarSink, parallelism);
+    }
+
+    private PulsarSerializationSchema<RowData> 
createPulsarSerializer(SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization) {
+        final List<LogicalType> physicalChildren = 
physicalDataType.getLogicalType().getChildren();
+
+        final RowData.FieldGetter[] keyFieldGetters = 
Arrays.stream(keyProjection)
+                .mapToObj(targetField -> 
RowData.createFieldGetter(physicalChildren.get(targetField), targetField))
+                .toArray(RowData.FieldGetter[]::new);
+
+        final RowData.FieldGetter[] valueFieldGetters = 
Arrays.stream(valueProjection)
+                .mapToObj(targetField -> 
RowData.createFieldGetter(physicalChildren.get(targetField), targetField))
+                .toArray(RowData.FieldGetter[]::new);
+
+        // determine the positions of metadata in the consumed row
+        final int[] metadataPositions = Stream.of(WritableMetadata.values())
+                .mapToInt(m -> {
+                    final int pos = metadataKeys.indexOf(m.key);
+                    if (pos < 0) {
+                        return -1;
+                    }
+                    return physicalChildren.size() + pos;
+                })
+                .toArray();
+
+        // check if metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        final long delayMilliseconds = Optional.ofNullable(this.properties
+                        .getProperty(PulsarOptions.SEND_DELAY_MILLISECONDS, 
"0"))
+                .filter(StringUtils::isNumeric)
+                .map(Long::valueOf)
+                .orElse(0L);
+
+        return new DynamicPulsarSerializationSchema(
+                keySerialization,
+                valueSerialization,
+                keyFieldGetters,
+                valueFieldGetters,
+                hasMetadata,
+                metadataPositions,
+                upsertMode,
+                DataTypeUtils.projectRow(physicalDataType, valueProjection),
+                formatType,
+                delayMilliseconds);
+    }
+
+    private SinkFunction<RowData> createPulsarSink(String topic, Properties 
properties,
+            PulsarSerializationSchema<RowData> pulsarSerializer) {
+        final ClientConfigurationData configurationData = PulsarClientUtils
+                .newClientConf(serviceUrl, properties);
+        return new FlinkPulsarSink<RowData>(
+                adminUrl,
+                Optional.ofNullable(topic),
+                configurationData,
+                properties,
+                pulsarSerializer,
+                messageRouter,
+                PulsarSinkSemantic.valueOf(semantic.toString())
+        );
+    }
+
+    public MessageRouter getMessageRouter() {
+        return messageRouter;
+    }
+
+    private @Nullable
+    SerializationSchema<RowData> createSerialization(
+            DynamicTableSink.Context context,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = 
DataTypeUtils.projectRow(this.physicalDataType, projection);
+        if (prefix != null) {
+            physicalFormatDataType = 
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeEncoder(context, physicalFormatDataType);
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        final PulsarDynamicTableSink copy = new PulsarDynamicTableSink(
+                this.serviceUrl,
+                this.adminUrl,
+                this.topic,
+                this.physicalDataType,
+                this.properties,
+                this.keyEncodingFormat,
+                this.valueEncodingFormat,
+                this.keyProjection,
+                this.valueProjection,
+                this.keyPrefix,
+                this.semantic,
+                this.formatType,
+                this.upsertMode,
+                this.parallelism,
+                this.messageRouter);
+        copy.metadataKeys = metadataKeys;
+        return copy;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PulsarDynamicTableSink)) {
+            return false;
+        }
+        PulsarDynamicTableSink that = (PulsarDynamicTableSink) o;
+        return upsertMode == that.upsertMode && Objects.equals(metadataKeys, 
that.metadataKeys)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(topic, that.topic) && 
Objects.equals(serviceUrl, that.serviceUrl)
+                && Objects.equals(adminUrl, that.adminUrl)
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+                && Objects.equals(valueEncodingFormat, 
that.valueEncodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(parallelism, that.parallelism) && semantic 
== that.semantic
+                && Objects.equals(formatType, that.formatType)
+                && Objects.equals(messageRouter, that.messageRouter);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(metadataKeys, physicalDataType, topic, 
serviceUrl, adminUrl, properties, keyEncodingFormat,
+                        valueEncodingFormat, keyPrefix, upsertMode, 
parallelism, semantic, formatType, messageRouter);
+        result = 31 * result + Arrays.hashCode(keyProjection);
+        result = 31 * result + Arrays.hashCode(valueProjection);
+        return result;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Pulsar dynamic table sink";
+    }
+
+    @Override
+    public Map<String, DataType> listWritableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(WritableMetadata.values()).forEachOrdered(m -> 
metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType 
consumedDataType) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    enum WritableMetadata {
+
+        PROPERTIES(
+                "properties",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.STRING().nullable()).nullable(),
+                (row, pos) -> {
+                    if (row.isNullAt(pos)) {
+                        return null;
+                    }
+                    final MapData map = row.getMap(pos);
+                    final ArrayData keyArray = map.keyArray();
+                    final ArrayData valueArray = map.valueArray();
+
+                    final Properties properties = new Properties();
+                    for (int i = 0; i < keyArray.size(); i++) {
+                        if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+                            final String key = 
keyArray.getString(i).toString();
+                            final String value = 
valueArray.getString(i).toString();
+                            properties.put(key, value);
+                        }
+                    }
+                    return properties;
+                }
+        ),
+
+        EVENT_TIME(
+                "eventTime",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                (row, pos) -> {
+                    if (row.isNullAt(pos)) {
+                        return null;
+                    }
+                    return row.getTimestamp(pos, 3).getMillisecond();
+                });
+        final String key;
+
+        final DataType dataType;
+
+        final DynamicPulsarSerializationSchema.MetadataConverter converter;
+
+        WritableMetadata(String key, DataType dataType, 
DynamicPulsarSerializationSchema.MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
+
diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java
new file mode 100644
index 000000000..9b853ffd6
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -0,0 +1,364 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
+import 
org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.PROPERTIES;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.TOPIC_PATTERN;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createKeyFormatProjection;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
+import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+
+/**
+ * Upsert-Pulsar factory.
+ */
+public class UpsertPulsarDynamicTableFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "upsert-pulsar-inlong";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(ADMIN_URL);
+        options.add(SERVICE_URL);
+        options.add(TOPIC);
+        options.add(KEY_FORMAT);
+        options.add(VALUE_FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(FactoryUtil.SINK_PARALLELISM);
+        options.add(PROPERTIES);
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        ReadableConfig tableOptions = helper.getOptions();
+        DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = 
helper.discoverDecodingFormat(
+                DeserializationFormatFactory.class,
+                KEY_FORMAT);
+        DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = 
helper.discoverDecodingFormat(
+                DeserializationFormatFactory.class,
+                VALUE_FORMAT);
+
+        // Validate the option data type.
+        final String valueFormatPrefix = tableOptions.getOptional(FORMAT)
+                .orElse(tableOptions.get(VALUE_FORMAT));
+        helper.validateExcept(PulsarTableOptions.PROPERTIES_PREFIX, 
valueFormatPrefix);
+        TableSchema schema = context.getCatalogTable().getSchema();
+        validateTableOptions(tableOptions, keyDecodingFormat, 
valueDecodingFormat, schema);
+
+        Tuple2<int[], int[]> keyValueProjections = 
createKeyValueProjections(context.getCatalogTable());
+        String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+        Properties properties = 
getPulsarProperties(context.getCatalogTable().toProperties());
+
+        // always use earliest to keep data integrity
+        PulsarTableOptions.StartupOptions startupOptions = new 
PulsarTableOptions.StartupOptions();
+        startupOptions.startupMode = StartupMode.EARLIEST;
+        startupOptions.specificOffsets = Collections.EMPTY_MAP;
+
+        String adminUrl = tableOptions.get(ADMIN_URL);
+        String serverUrl = tableOptions.get(SERVICE_URL);
+        List<String> topics = tableOptions.get(TOPIC);
+        String topicPattern = tableOptions.get(TOPIC_PATTERN);
+        return new PulsarDynamicTableSource(
+                schema.toPhysicalRowDataType(),
+                keyDecodingFormat,
+                new DecodingFormatWrapper(valueDecodingFormat),
+                keyValueProjections.f0,
+                keyValueProjections.f1,
+                keyPrefix,
+                topics,
+                topicPattern,
+                serverUrl,
+                adminUrl,
+                properties,
+                startupOptions,
+                true);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        ReadableConfig tableOptions = helper.getOptions();
+
+        EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = 
helper.discoverEncodingFormat(
+                SerializationFormatFactory.class,
+                KEY_FORMAT);
+        EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = 
helper.discoverEncodingFormat(
+                SerializationFormatFactory.class,
+                VALUE_FORMAT);
+
+        // Validate the option data type.
+        helper.validateExcept(PulsarTableOptions.PROPERTIES_PREFIX);
+        TableSchema schema = context.getCatalogTable().getSchema();
+        validateTableOptions(tableOptions, keyEncodingFormat, 
valueEncodingFormat, schema);
+
+        Tuple2<int[], int[]> keyValueProjections = 
createKeyValueProjections(context.getCatalogTable());
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+        Properties properties = 
getPulsarProperties(context.getCatalogTable().toProperties());
+        Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM);
+
+        String adminUrl = tableOptions.get(ADMIN_URL);
+        String serverUrl = tableOptions.get(SERVICE_URL);
+        String formatType = tableOptions.getOptional(FORMAT).orElseGet(() -> 
tableOptions.get(VALUE_FORMAT));
+
+        return new PulsarDynamicTableSink(
+                serverUrl,
+                adminUrl,
+                tableOptions.get(TOPIC).get(0),
+                schema.toPhysicalRowDataType(),
+                properties,
+                keyEncodingFormat,
+                new EncodingFormatWrapper(valueEncodingFormat),
+                keyValueProjections.f0,
+                keyValueProjections.f1,
+                keyPrefix,
+                PulsarSinkSemantic.AT_LEAST_ONCE,
+                formatType,
+                true,
+                parallelism,
+                KeyHashMessageRouterImpl.INSTANCE);
+    }
+
+    private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable 
catalogTable) {
+        TableSchema schema = catalogTable.getSchema();
+        // primary key should validated earlier
+        List<String> keyFields = schema.getPrimaryKey().get().getColumns();
+        DataType physicalDataType = schema.toPhysicalRowDataType();
+
+        Configuration tableOptions = 
Configuration.fromMap(catalogTable.getOptions());
+        // upsert-pulsar will set key.fields to primary key fields by default
+        tableOptions.set(KEY_FIELDS, keyFields);
+
+        int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+        int[] valueProjection = createValueFormatProjection(tableOptions, 
physicalDataType);
+
+        return Tuple2.of(keyProjection, valueProjection);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Validation
+    // 
--------------------------------------------------------------------------------------------
+
+    private static void validateTableOptions(
+            ReadableConfig tableOptions,
+            Format keyFormat,
+            Format valueFormat,
+            TableSchema schema) {
+        validateTopic(tableOptions);
+        validateFormat(keyFormat, valueFormat, tableOptions);
+        validatePKConstraints(schema);
+    }
+
+    private static void validateTopic(ReadableConfig tableOptions) {
+        List<String> topic = tableOptions.get(TOPIC);
+        if (topic.size() > 1) {
+            throw new ValidationException("The 'upsert-pulsar' connector 
doesn't support topic list now. "
+                    + "Please use single topic as the value of the parameter 
'topic'.");
+        }
+    }
+
+    private static void validateFormat(Format keyFormat, Format valueFormat, 
ReadableConfig tableOptions) {
+        if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            String identifier = tableOptions.get(KEY_FORMAT);
+            throw new ValidationException(String.format(
+                    "'upsert-pulsar' connector doesn't support '%s' as key 
format, "
+                            + "because '%s' is not in insert-only mode.",
+                    identifier,
+                    identifier));
+        }
+        if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            String identifier = tableOptions.get(VALUE_FORMAT);
+            throw new ValidationException(String.format(
+                    "'upsert-Pulsar' connector doesn't support '%s' as value 
format, "
+                            + "because '%s' is not in insert-only mode.",
+                    identifier,
+                    identifier));
+        }
+    }
+
+    private static void validatePKConstraints(TableSchema schema) {
+        if (!schema.getPrimaryKey().isPresent()) {
+            throw new ValidationException("'upsert-pulsar' tables require to 
define a PRIMARY KEY constraint. "
+                    + "The PRIMARY KEY specifies which columns should be "
+                    + "read from or write to the Pulsar message key. "
+                    + "The PRIMARY KEY also defines records in the 
'upsert-pulsar' table "
+                    + "should update or delete on which keys.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format wrapper
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * It is used to wrap the decoding format and expose the desired changelog 
mode. It's only works
+     * for insert-only format.
+     */
+    protected static class DecodingFormatWrapper implements 
DecodingFormat<DeserializationSchema<RowData>> {
+        private final DecodingFormat<DeserializationSchema<RowData>> 
innerDecodingFormat;
+
+        private static final ChangelogMode SOURCE_CHANGELOG_MODE = 
ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+
+        public 
DecodingFormatWrapper(DecodingFormat<DeserializationSchema<RowData>> 
innerDecodingFormat) {
+            this.innerDecodingFormat = innerDecodingFormat;
+        }
+
+        @Override
+        public DeserializationSchema<RowData> createRuntimeDecoder(
+                DynamicTableSource.Context context, DataType producedDataType) 
{
+            return innerDecodingFormat.createRuntimeDecoder(context, 
producedDataType);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return SOURCE_CHANGELOG_MODE;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            DecodingFormatWrapper that = (DecodingFormatWrapper) obj;
+            return Objects.equals(innerDecodingFormat, 
that.innerDecodingFormat);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(innerDecodingFormat);
+        }
+    }
+
+    /**
+     * It is used to wrap the encoding format and expose the desired changelog 
mode. It's only works
+     * for insert-only format.
+     */
+    protected static class EncodingFormatWrapper implements 
EncodingFormat<SerializationSchema<RowData>> {
+        private final EncodingFormat<SerializationSchema<RowData>> 
innerEncodingFormat;
+
+        public static final ChangelogMode SINK_CHANGELOG_MODE = 
ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+
+        public 
EncodingFormatWrapper(EncodingFormat<SerializationSchema<RowData>> 
innerEncodingFormat) {
+            this.innerEncodingFormat = innerEncodingFormat;
+        }
+
+        @Override
+        public SerializationSchema<RowData> createRuntimeEncoder(
+                DynamicTableSink.Context context, DataType consumedDataType) {
+            return innerEncodingFormat.createRuntimeEncoder(context, 
consumedDataType);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return SINK_CHANGELOG_MODE;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+
+            EncodingFormatWrapper that = (EncodingFormatWrapper) obj;
+            return Objects.equals(innerEncodingFormat, 
that.innerEncodingFormat);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(innerEncodingFormat);
+        }
+    }
+}
+
diff --git 
a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
index 9719b692b..83e2d7a4d 100644
--- 
a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
\ No newline at end of file
+org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
+org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
diff --git 
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java
 
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java
new file mode 100644
index 000000000..6ae3b554a
--- /dev/null
+++ 
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/PulsarSqlParserTest.java
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import 
org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import 
org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PulsarSqlParserTest {
+
+    private KafkaLoadNode buildKafkaLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new 
LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new 
StringFormatInfo())));
+        return new KafkaLoadNode("1", "kafka_output", fields, relations, null, 
null,
+                "workerJson", "localhost:9092",
+                new JsonFormat(), null,
+                null, null);
+    }
+
+    public PulsarExtractNode buildPulsarExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Format format = new CsvFormat();
+        return new PulsarExtractNode("2",
+                "pulsar_input",
+                fields,
+                null,
+                null,
+                "persistent://public/default/test_stream",
+                "http://localhost:8080";,
+                "pulsar://localhost:6650",
+                format,
+                "earliest",
+                null);
+    }
+
+    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelationShip(inputIds, outputIds);
+    }
+
+    @Test
+    public void testPulsar() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildPulsarExtractNode();
+        Node outputNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        FlinkSqlParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
similarity index 78%
copy from 
inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
copy to 
inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9719b692b..53bd06a1f 100644
--- 
a/inlong-sort/sort-connectors/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-single-tenant/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
\ No newline at end of file
+org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
+org.apache.inlong.sort.flink.kafka.KafkaDynamicTableFactory
+org.apache.inlong.sort.flink.pulsar.table.PulsarDynamicTableFactory
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 22c6de5de..1c3f36a03 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -473,6 +473,12 @@
  Source  : flink-connector-hive 1.13.5 (Please note that the software have 
been modified.)
  License : https://github.com/apache/flink/blob/master/NOTICE
 
+1.3.7 
inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/DynamicPulsarSerializationSchema.java
+      
inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableFactory.java
+      
inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/PulsarDynamicTableSink.java
+      
inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/flink/pulsar/table/UpsertPulsarDynamicTableFactory.java
+ Source  : pulsar-flink-connector_2.11 1.13.6.1-rc9 (Please note that the 
software have been modified.)
+ License : https://github.com/streamnative/pulsar-flink/blob/master/LICENSE
 
 =======================================================================
 Apache InLong(Incubating) Subcomponents:
@@ -666,6 +672,7 @@ The text of each license is the standard Apache 2.0 license.
   xerces:xercesImpl:2.12.0 - Xerces2 Java Parser 
(http://xerces.apache.org/xerces2-j), (The Apache License, Version 2.0)
   org.apache.zookeeper:zookeeper:3.6.3 - Apache ZooKeeper - Server 
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-server), 
(Apache License, Version 2.0)
   org.apache.zookeeper:zookeeper-jute:3.6.3 - Apache ZooKeeper - Jute 
(https://github.com/apache/zookeeper/tree/release-3.6.3/zookeeper-jute), 
(Apache License, Version 2.0)
+  io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9 - 
StreamNative :: Pulsar Flink Connector :: Scala 2.11 
(https://github.com/streamnative/pulsar-flink/blob/release-1.13/LICENSE), 
(Apache License, Version 2.0)
 
 
 ========================================================================
diff --git a/pom.xml b/pom.xml
index 91c393601..cd5514fe1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
         <flink.scala.binary.version>2.11</flink.scala.binary.version>
         <flink.jackson.version>2.12.1-13.0</flink.jackson.version>
         
<flink.connector.postgres.cdc.version>2.0.2</flink.connector.postgres.cdc.version>
+        <flink.pulsar.version>1.13.6.1-rc9</flink.pulsar.version>
 
         
<qcloud.flink.cos.fs.hadoop.version>1.10.0-0.1.10</qcloud.flink.cos.fs.hadoop.version>
         <qcloud.chdfs.version>2.5</qcloud.chdfs.version>
@@ -977,6 +978,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>io.streamnative.connectors</groupId>
+                
<artifactId>pulsar-flink-connector_${scala.binary.version}</artifactId>
+                <version>${flink.pulsar.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-test-utils-junit</artifactId>

Reply via email to