leonardBang commented on code in PR #56:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/56#discussion_r1303009152


##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableFactory.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkOptions;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.sink.PulsarTableSink;
+import 
org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory;
+import org.apache.flink.connector.pulsar.table.source.PulsarTableSource;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createKeyFormatProjection;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.createValueFormatProjection;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getKeyDecodingFormat;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getKeyEncodingFormat;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getMessageDelayMillis;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getPulsarProperties;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getStartCursor;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getStopCursor;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getSubscriptionType;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicListFromOptions;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRouter;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getTopicRoutingMode;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueEncodingFormat;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.EXPLICIT;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SINK_MESSAGE_DELAY_INTERVAL;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.TOPICS;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.VALUE_FORMAT;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils.validatePrimaryKeyConstraints;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils.validateTableSinkOptions;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableValidationUtils.validateTableSourceOptions;
+import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static 
org.apache.pulsar.shade.org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+
+/**
+ * Factory for creating {@link DynamicTableSource} and {@link 
DynamicTableSink}.
+ *
+ * <p>The main role of this class is to retrieve config options and validate 
options from config and
+ * the table schema. It also sets default values if a config option is not 
present.
+ */
+public class PulsarTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
+    public static final String IDENTIFIER = "pulsar";
+
+    public static final String DEFAULT_SUBSCRIPTION_NAME_PREFIX = 
"flink-sql-connector-pulsar-";
+
+    public static final boolean UPSERT_DISABLED = false;
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        // Format options should be retrieved before validation.
+        final DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat 
=
+                getKeyDecodingFormat(helper);
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                getValueDecodingFormat(helper);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        // Validate configs are not conflict; each options is consumed; no 
unwanted configs
+        // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
+        helper.validateExcept(
+                PulsarOptions.CLIENT_CONFIG_PREFIX,
+                PulsarOptions.ADMIN_CONFIG_PREFIX,
+                PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+                PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+                PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+        validatePrimaryKeyConstraints(
+                context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);
+
+        validateTableSourceOptions(tableOptions);
+
+        // Retrieve configs
+        final List<String> topics = getTopicListFromOptions(tableOptions);
+        final StartCursor startCursor = getStartCursor(tableOptions);
+        final StopCursor stopCursor = getStopCursor(tableOptions);
+        final SubscriptionType subscriptionType = 
getSubscriptionType(tableOptions);
+
+        // Forward source configs
+        final Properties properties = getPulsarProperties(tableOptions);
+        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
+        properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
+        // Set random subscriptionName if not provided
+        properties.setProperty(
+                PULSAR_SUBSCRIPTION_NAME.key(),
+                tableOptions
+                        .getOptional(SOURCE_SUBSCRIPTION_NAME)
+                        .orElse(DEFAULT_SUBSCRIPTION_NAME_PREFIX + 
randomAlphabetic(5)));
+        // Retrieve physical fields (not including computed or metadata 
fields),
+        // and projections and create a schema factory based on such 
information.
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final PulsarTableDeserializationSchemaFactory 
deserializationSchemaFactory =
+                new PulsarTableDeserializationSchemaFactory(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        keyProjection,
+                        valueDecodingFormat,
+                        valueProjection,
+                        UPSERT_DISABLED);
+
+        // Set default values for configuration not exposed to user.
+        final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormatForMetadataPushdown =
+                valueDecodingFormat;
+        final ChangelogMode changelogMode = 
decodingFormatForMetadataPushdown.getChangelogMode();
+
+        return new PulsarTableSource(
+                deserializationSchemaFactory,
+                decodingFormatForMetadataPushdown,
+                changelogMode,
+                topics,
+                properties,
+                startCursor,
+                stopCursor,
+                subscriptionType);
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        // Format options should be retrieved before validation.
+        final EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+                getKeyEncodingFormat(helper);
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                getValueEncodingFormat(helper);
+        ReadableConfig tableOptions = helper.getOptions();
+
+        // Validate configs are not conflict; each options is consumed; no 
unwanted configs
+        // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
+        helper.validateExcept(
+                PulsarOptions.CLIENT_CONFIG_PREFIX,
+                PulsarOptions.ADMIN_CONFIG_PREFIX,
+                PulsarSourceOptions.SOURCE_CONFIG_PREFIX,
+                PulsarSourceOptions.CONSUMER_CONFIG_PREFIX,
+                PulsarSinkOptions.PRODUCER_CONFIG_PREFIX,
+                PulsarSinkOptions.SINK_CONFIG_PREFIX);
+
+        validatePrimaryKeyConstraints(
+                context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), 
helper);
+
+        validateTableSinkOptions(tableOptions);
+
+        // Retrieve configs
+        final TopicRouter<RowData> topicRouter =
+                getTopicRouter(tableOptions, context.getClassLoader());
+        final TopicRoutingMode topicRoutingMode = 
getTopicRoutingMode(tableOptions);
+        final long messageDelayMillis = getMessageDelayMillis(tableOptions);
+
+        final List<String> topics = getTopicListFromOptions(tableOptions);
+
+        // Forward sink configs
+        final Properties properties = getPulsarProperties(tableOptions);
+        properties.setProperty(PULSAR_ADMIN_URL.key(), 
tableOptions.get(ADMIN_URL));
+        properties.setProperty(PULSAR_SERVICE_URL.key(), 
tableOptions.get(SERVICE_URL));
+
+        // Retrieve physical DataType (not including computed or metadata 
fields)
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final PulsarTableSerializationSchemaFactory serializationSchemaFactory 
=
+                new PulsarTableSerializationSchemaFactory(
+                        physicalDataType,
+                        keyEncodingFormat,
+                        keyProjection,
+                        valueEncodingFormat,
+                        valueProjection,
+                        UPSERT_DISABLED);
+
+        // Set default values for configuration not exposed to user.
+        final DeliveryGuarantee deliveryGuarantee = 
DeliveryGuarantee.AT_LEAST_ONCE;
+        final ChangelogMode changelogMode = 
valueEncodingFormat.getChangelogMode();
+
+        return new PulsarTableSink(
+                serializationSchemaFactory,
+                changelogMode,
+                topics,
+                properties,
+                deliveryGuarantee,
+                topicRouter,
+                topicRoutingMode,
+                messageDelayMillis);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Stream.of(TOPICS, ADMIN_URL, 
SERVICE_URL).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Stream.of(
+                        FactoryUtil.FORMAT,
+                        VALUE_FORMAT,
+                        SOURCE_SUBSCRIPTION_NAME,
+                        SOURCE_SUBSCRIPTION_TYPE,
+                        SOURCE_START_FROM_MESSAGE_ID,
+                        SOURCE_START_FROM_PUBLISH_TIME,
+                        SOURCE_STOP_AT_MESSAGE_ID,
+                        SOURCE_STOP_AFTER_MESSAGE_ID,
+                        SOURCE_STOP_AT_PUBLISH_TIME,
+                        SINK_CUSTOM_TOPIC_ROUTER,
+                        SINK_TOPIC_ROUTING_MODE,
+                        SINK_MESSAGE_DELAY_INTERVAL,
+                        SINK_PARALLELISM,
+                        KEY_FORMAT,
+                        KEY_FIELDS,
+                        EXPLICIT)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Format and Delivery guarantee related options are not forward options.
+     *
+     * @return

Review Comment:
   minor: remove or add some words



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {}
+
+    public static final ConfigOption<List<String>> TOPICS =
+            ConfigOptions.key("topics")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s.",
+                                            code(";"), code("topic-1;topic-2"))

Review Comment:
   These topics should have same schema for data as Flink Table need a fixed 
schema, could you add the limitation?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {}
+
+    public static final ConfigOption<List<String>> TOPICS =
+            ConfigOptions.key("topics")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s.",
+                                            code(";"), code("topic-1;topic-2"))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<SubscriptionType> 
SOURCE_SUBSCRIPTION_TYPE =
+            ConfigOptions.key("source.subscription-type")
+                    .enumType(SubscriptionType.class)
+                    .defaultValue(SubscriptionType.Exclusive)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The [subscription 
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
 that is supported by the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 Currently, only %s and %s subscription types are supported.",
+                                            code("Exclusive"), code("Shared"))
+                                    .build());
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+     * Copied because we want to have a default value for it.
+     */
+    public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("source.subscription-name")

Review Comment:
   I'm a little confused about the two configurations from users' perspective, 
why `subscription-type` has a default value and the later one does not?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/sink/PulsarTableSink.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table.sink;
+
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import 
org.apache.flink.connector.pulsar.sink.writer.delayer.FixedMessageDelayer;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Pulsar SQL Connector sink. It supports {@link SupportsWritingMetadata}. */
+public class PulsarTableSink implements DynamicTableSink, 
SupportsWritingMetadata {
+
+    private final PulsarTableSerializationSchemaFactory 
serializationSchemaFactory;
+
+    private final ChangelogMode changelogMode;
+
+    private final List<String> topics;
+
+    private final Properties properties;
+
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    @Nullable private final TopicRouter<RowData> topicRouter;
+
+    private final TopicRoutingMode topicRoutingMode;
+
+    private final long messageDelayMillis;
+
+    public PulsarTableSink(
+            PulsarTableSerializationSchemaFactory serializationSchemaFactory,
+            ChangelogMode changelogMode,
+            List<String> topics,
+            Properties properties,
+            DeliveryGuarantee deliveryGuarantee,
+            @Nullable TopicRouter<RowData> topicRouter,
+            TopicRoutingMode topicRoutingMode,
+            long messageDelayMillis) {
+        this.serializationSchemaFactory = 
checkNotNull(serializationSchemaFactory);
+        this.changelogMode = checkNotNull(changelogMode);
+        this.topics = checkNotNull(topics);
+        // Mutable attributes
+        this.properties = checkNotNull(properties);
+        this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
+        this.topicRouter = topicRouter;
+        this.topicRoutingMode = checkNotNull(topicRoutingMode);
+        this.messageDelayMillis = messageDelayMillis;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return this.changelogMode;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+
+        final PulsarSerializationSchema<RowData> pulsarSerializationSchema =
+                
serializationSchemaFactory.createPulsarSerializationSchema(context);
+
+        final PulsarSinkBuilder<RowData> pulsarSinkBuilder =
+                PulsarSink.builder()
+                        .setSerializationSchema(pulsarSerializationSchema)
+                        .setProperties(properties)
+                        .setDeliveryGuarantee(deliveryGuarantee)
+                        .setTopics(topics)
+                        .setTopicRoutingMode(topicRoutingMode)
+                        .delaySendingMessage(new 
FixedMessageDelayer<>(messageDelayMillis));

Review Comment:
   Here looks like we set a different default value.
   In table: default 0 will map to  FixedMessageDelayer 
   In DS:default -1 map to never() FixedMessageDelayer
   Duration of 0 is meaningless for most cases as publish/schedule with a delay 
0 mills is meaningless, I'd like to use -1 as default value.
   



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableValidationUtils.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.RowKind;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptionUtils.getValueDecodingFormat;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FIELDS;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.KEY_FORMAT;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SINK_CUSTOM_TOPIC_ROUTER;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SINK_TOPIC_ROUTING_MODE;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_START_FROM_PUBLISH_TIME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AFTER_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_MESSAGE_ID;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_STOP_AT_PUBLISH_TIME;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.SOURCE_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.table.PulsarTableOptions.TOPICS;
+import static org.apache.pulsar.common.naming.TopicName.isValid;
+
+/** Util class for source and sink validation rules. */
+public class PulsarTableValidationUtils {
+
+    private PulsarTableValidationUtils() {}
+
+    public static void validatePrimaryKeyConstraints(
+            ObjectIdentifier tableName,
+            int[] primaryKeyIndexes,
+            FactoryUtil.TableFactoryHelper helper) {
+        final DecodingFormat<DeserializationSchema<RowData>> format =
+                getValueDecodingFormat(helper);
+        if (primaryKeyIndexes.length > 0
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    String.format(
+                            "The Pulsar table '%s' with '%s' format doesn't 
support defining PRIMARY KEY constraint"
+                                    + " on the table, because it can't 
guarantee the semantic of primary key.",
+                            tableName.asSummaryString(), format));
+        }
+    }
+
+    public static void validateTableSourceOptions(ReadableConfig tableOptions) 
{
+        validateTopicsConfigs(tableOptions);
+        validateStartCursorConfigs(tableOptions);
+        validateStopCursorConfigs(tableOptions);
+        validateSubscriptionTypeConfigs(tableOptions);
+        validateKeyFormatConfigs(tableOptions);
+    }
+
+    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
+        validateTopicsConfigs(tableOptions);
+        validateKeyFormatConfigs(tableOptions);
+        validateSinkRoutingConfigs(tableOptions);
+    }
+
+    protected static void validateTopicsConfigs(ReadableConfig tableOptions) {
+        if (tableOptions.get(TOPICS).isEmpty()) {
+            throw new ValidationException("The topics list should not be 
empty.");
+        }
+
+        for (String topic : tableOptions.get(TOPICS)) {
+            if (!isValid(topic)) {
+                throw new ValidationException(
+                        String.format("The topics name %s is not a valid topic 
name.", topic));
+            }
+        }
+    }
+
+    protected static void validateStartCursorConfigs(ReadableConfig 
tableOptions) {
+        if (tableOptions.getOptional(SOURCE_START_FROM_MESSAGE_ID).isPresent()
+                && 
tableOptions.getOptional(SOURCE_START_FROM_PUBLISH_TIME).isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s and %s can be specified. Detected 
both of them",
+                            SOURCE_START_FROM_MESSAGE_ID, 
SOURCE_START_FROM_PUBLISH_TIME));
+        }
+    }
+
+    protected static void validateStopCursorConfigs(ReadableConfig 
tableOptions) {
+        Set<ConfigOption<?>> conflictConfigOptions =
+                Sets.newHashSet(
+                        SOURCE_STOP_AT_MESSAGE_ID,
+                        SOURCE_STOP_AFTER_MESSAGE_ID,
+                        SOURCE_STOP_AT_PUBLISH_TIME);
+
+        long configsNums =
+                conflictConfigOptions.stream()
+                        .map(tableOptions::getOptional)
+                        .filter(Optional::isPresent)
+                        .count();
+
+        if (configsNums > 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s, %s and %s can be specified. 
Detected more than 1 of them",
+                            SOURCE_STOP_AT_MESSAGE_ID,
+                            SOURCE_STOP_AFTER_MESSAGE_ID,
+                            SOURCE_STOP_AT_PUBLISH_TIME));
+        }
+    }
+
+    protected static void validateSubscriptionTypeConfigs(ReadableConfig 
tableOptions) {
+        SubscriptionType subscriptionType = 
tableOptions.get(SOURCE_SUBSCRIPTION_TYPE);
+
+        if (subscriptionType == SubscriptionType.Failover) {
+            throw new ValidationException(
+                    String.format(
+                            "%s SubscriptionType is not supported. ", 
SubscriptionType.Failover));
+        }
+    }
+
+    protected static void validateKeyFormatConfigs(ReadableConfig 
tableOptions) {
+        final Optional<String> optionalKeyFormat = 
tableOptions.getOptional(KEY_FORMAT);
+        final Optional<List<String>> optionalKeyFields = 
tableOptions.getOptional(KEY_FIELDS);
+        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "The option '%s' can only be declared if a key 
format is defined using '%s'.",
+                            KEY_FIELDS.key(), KEY_FORMAT.key()));
+        } else if (optionalKeyFormat.isPresent()
+                && (!optionalKeyFields.isPresent() || 
optionalKeyFields.get().size() == 0)) {
+            throw new ValidationException(
+                    String.format(
+                            "A key format '%s' requires the declaration of one 
or more of key fields using '%s'.",
+                            KEY_FORMAT.key(), KEY_FIELDS.key()));
+        }
+    }
+
+    protected static void validateSinkRoutingConfigs(ReadableConfig 
tableOptions) {
+        if 
(tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE).orElse(TopicRoutingMode.ROUND_ROBIN)
+                == TopicRoutingMode.CUSTOM) {
+            throw new ValidationException(
+                    String.format(
+                            "Only  %s and %s can be used. For %s, please use 
sink.custom-topic-router for"
+                                    + "custom topic router and do not set this 
config.",
+                            TopicRoutingMode.ROUND_ROBIN,
+                            TopicRoutingMode.MESSAGE_KEY_HASH,
+                            TopicRoutingMode.CUSTOM));
+        }
+        if (tableOptions.getOptional(SINK_CUSTOM_TOPIC_ROUTER).isPresent()
+                && 
tableOptions.getOptional(SINK_TOPIC_ROUTING_MODE).isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Only one of %s and %s can be specified. Detected 
both of them",
+                            SINK_CUSTOM_TOPIC_ROUTER, 
SINK_TOPIC_ROUTING_MODE));
+        }
+    }
+
+    protected static void validateUpsertModeKeyConstraints(
+            ReadableConfig tableOptions, int[] primaryKeyIndexes) {
+        if (!tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+            throw new ValidationException(
+                    "Upsert mode requires key.fields set to the primary key 
fields, should be set");
+        }
+
+        if (tableOptions.getOptional(KEY_FIELDS).get().size() == 0
+                || primaryKeyIndexes.length == 0) {
+            throw new ValidationException(
+                    "'upsert-pulsar' require to define a PRIMARY KEY 
constraint. "

Review Comment:
   `upsert-pulsar` did expose to users yet, right?  I tend to add a 
upsert-pulsar table connector, it should be different with current pulsar table 
connector



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarRowDataConverter.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table.source;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Contains the projection information needed to map a Pulsar message to 
proper key fields, value
+ * fields and metadata fields.
+ */
+public class PulsarRowDataConverter implements Serializable {
+    private static final long serialVersionUID = -3399264407634977459L;

Review Comment:
   1L



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {}
+
+    public static final ConfigOption<List<String>> TOPICS =
+            ConfigOptions.key("topics")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s.",
+                                            code(";"), code("topic-1;topic-2"))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<SubscriptionType> 
SOURCE_SUBSCRIPTION_TYPE =
+            ConfigOptions.key("source.subscription-type")
+                    .enumType(SubscriptionType.class)
+                    .defaultValue(SubscriptionType.Exclusive)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The [subscription 
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
 that is supported by the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 Currently, only %s and %s subscription types are supported.",
+                                            code("Exclusive"), code("Shared"))
+                                    .build());
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+     * Copied because we want to have a default value for it.
+     */
+    public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("source.subscription-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("source.start.message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Optional message id used to 
specify a consuming starting point for "
+                                                    + "source. Use %s, %s or 
pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+    public static final ConfigOption<Long> SOURCE_START_FROM_PUBLISH_TIME =
+            ConfigOptions.key("source.start.publish-time")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) the publish timestamp 
that is used to specify a starting point for the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source)
 to consume data.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_STOP_AT_MESSAGE_ID =
+            ConfigOptions.key("source.stop.at-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop cursor 
for the unbounded sql "
+                                    + "source. Use \"never\", \"latest\" or 
pass in a message id "
+                                    + "representation in 
\"ledgerId:entryId:partitionId\", "
+                                    + "such as \"12:2:-1\"");
+
+    public static final ConfigOption<String> SOURCE_STOP_AFTER_MESSAGE_ID =
+            ConfigOptions.key("source.stop.after-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop 
position but include the "
+                                    + "given message in the consuming result 
for the unbounded sql "
+                                    + "source. Pass in a message id "

Review Comment:
   bounded ?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {}
+
+    public static final ConfigOption<List<String>> TOPICS =
+            ConfigOptions.key("topics")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s.",
+                                            code(";"), code("topic-1;topic-2"))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<SubscriptionType> 
SOURCE_SUBSCRIPTION_TYPE =
+            ConfigOptions.key("source.subscription-type")
+                    .enumType(SubscriptionType.class)
+                    .defaultValue(SubscriptionType.Exclusive)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The [subscription 
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
 that is supported by the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 Currently, only %s and %s subscription types are supported.",
+                                            code("Exclusive"), code("Shared"))
+                                    .build());
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+     * Copied because we want to have a default value for it.
+     */
+    public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("source.subscription-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("source.start.message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Optional message id used to 
specify a consuming starting point for "
+                                                    + "source. Use %s, %s or 
pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+    public static final ConfigOption<Long> SOURCE_START_FROM_PUBLISH_TIME =
+            ConfigOptions.key("source.start.publish-time")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) the publish timestamp 
that is used to specify a starting point for the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source)
 to consume data.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_STOP_AT_MESSAGE_ID =
+            ConfigOptions.key("source.stop.at-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop cursor 
for the unbounded sql "

Review Comment:
   unbounded?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchema.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table.source;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link 
PulsarTableSource}.
+ *
+ * <p>Both Flink's key decoding format and value decoding format are wrapped 
in this class. It is
+ * responsible for getting metadata fields from a physical pulsar message 
body, and the final
+ * projection mapping from Pulsar message fields to Flink row.
+ *
+ * <p>After retrieving key and value bytes and convert them into a list of 
{@link RowData}, it then
+ * delegates metadata appending, key and value {@link RowData} combining to a 
{@link
+ * PulsarRowDataConverter} instance.
+ */
+public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSchema<RowData> {
+    private static final long serialVersionUID = -3298784447432136216L;

Review Comment:
   1L



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/PulsarTableOptions.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
+
+/**
+ * Config options that is used to configure a Pulsar SQL Connector. These 
config options are
+ * specific to SQL Connectors only. Other runtime configurations can be found 
in {@link
+ * org.apache.flink.connector.pulsar.common.config.PulsarOptions}, {@link
+ * org.apache.flink.connector.pulsar.source.PulsarSourceOptions}, and {@link
+ * org.apache.flink.connector.pulsar.sink.PulsarSinkOptions}.
+ */
+@PublicEvolving
+public final class PulsarTableOptions {
+
+    private PulsarTableOptions() {}
+
+    public static final ConfigOption<List<String>> TOPICS =
+            ConfigOptions.key("topics")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Topic name(s) the table reads 
data from. It can be a single topic name or a list of topic names separated by 
a semicolon symbol (%s) like %s.",
+                                            code(";"), code("topic-1;topic-2"))
+                                    .build());
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table Source Options
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<SubscriptionType> 
SOURCE_SUBSCRIPTION_TYPE =
+            ConfigOptions.key("source.subscription-type")
+                    .enumType(SubscriptionType.class)
+                    .defaultValue(SubscriptionType.Exclusive)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The [subscription 
type](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-subscriptions)
 that is supported by the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 Currently, only %s and %s subscription types are supported.",
+                                            code("Exclusive"), code("Shared"))
+                                    .build());
+
+    /**
+     * Exactly same as {@link
+     * 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions#PULSAR_SUBSCRIPTION_NAME}.
+     * Copied because we want to have a default value for it.
+     */
+    public static final ConfigOption<String> SOURCE_SUBSCRIPTION_NAME =
+            ConfigOptions.key("source.subscription-name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The subscription name of the 
consumer that is used by the runtime [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source).
 This argument is required for constructing the consumer.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_START_FROM_MESSAGE_ID =
+            ConfigOptions.key("source.start.message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Optional message id used to 
specify a consuming starting point for "
+                                                    + "source. Use %s, %s or 
pass in a message id "
+                                                    + "representation in %s, "
+                                                    + "such as %s",
+                                            code("earliest"),
+                                            code("latest"),
+                                            
code("ledgerId:entryId:partitionId"),
+                                            code("12:2:-1"))
+                                    .build());
+
+    public static final ConfigOption<Long> SOURCE_START_FROM_PUBLISH_TIME =
+            ConfigOptions.key("source.start.publish-time")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "(Optional) the publish timestamp 
that is used to specify a starting point for the [Pulsar DataStream source 
connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/pulsar/#pulsar-source)
 to consume data.")
+                                    .build());
+
+    public static final ConfigOption<String> SOURCE_STOP_AT_MESSAGE_ID =
+            ConfigOptions.key("source.stop.at-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop cursor 
for the unbounded sql "
+                                    + "source. Use \"never\", \"latest\" or 
pass in a message id "
+                                    + "representation in 
\"ledgerId:entryId:partitionId\", "
+                                    + "such as \"12:2:-1\"");
+
+    public static final ConfigOption<String> SOURCE_STOP_AFTER_MESSAGE_ID =
+            ConfigOptions.key("source.stop.after-message-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional message id used to specify a stop 
position but include the "
+                                    + "given message in the consuming result 
for the unbounded sql "
+                                    + "source. Pass in a message id "

Review Comment:
   bounded ?



##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/table/source/PulsarTableDeserializationSchema.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.table.source;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link 
PulsarTableSource}.
+ *
+ * <p>Both Flink's key decoding format and value decoding format are wrapped 
in this class. It is
+ * responsible for getting metadata fields from a physical pulsar message 
body, and the final
+ * projection mapping from Pulsar message fields to Flink row.
+ *
+ * <p>After retrieving key and value bytes and convert them into a list of 
{@link RowData}, it then
+ * delegates metadata appending, key and value {@link RowData} combining to a 
{@link
+ * PulsarRowDataConverter} instance.
+ */
+public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSchema<RowData> {
+    private static final long serialVersionUID = -3298784447432136216L;
+
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    @Nullable private final DeserializationSchema<RowData> keyDeserialization;
+
+    private final DeserializationSchema<RowData> valueDeserialization;
+
+    private final PulsarRowDataConverter rowDataConverter;
+
+    private final boolean upsertMode;
+
+    public PulsarTableDeserializationSchema(
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo,
+            PulsarRowDataConverter rowDataConverter,
+            boolean upsertMode) {
+        this.keyDeserialization = keyDeserialization;
+        this.valueDeserialization = checkNotNull(valueDeserialization);
+        this.rowDataConverter = checkNotNull(rowDataConverter);
+        this.producedTypeInfo = checkNotNull(producedTypeInfo);
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public void open(PulsarInitializationContext context, SourceConfiguration 
configuration)
+            throws Exception {
+        if (keyDeserialization != null) {
+            keyDeserialization.open(context);
+        }
+        valueDeserialization.open(context);
+    }
+
+    @Override
+    public void deserialize(Message<byte[]> message, Collector<RowData> 
collector)
+            throws IOException {
+
+        // Get the key row data
+        List<RowData> keyRowData = new ArrayList<>();
+        if (keyDeserialization != null) {
+            keyDeserialization.deserialize(message.getKeyBytes(), new 
ListCollector<>(keyRowData));
+        }
+
+        // Get the value row data
+        List<RowData> valueRowData = new ArrayList<>();
+
+        if (upsertMode && message.getData().length == 0) {
+            checkNotNull(keyDeserialization, "upsert mode must specify a key 
format");

Review Comment:
    We can move the check to compile phase to check  once instead of checking 
for each tombstone message



##########
pom.xml:
##########
@@ -106,6 +110,7 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+

Review Comment:
   useless line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to