This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit bbd7c6d46c29e5f298f08a281775c79cc63a7149
Author: SteNicholas <[email protected]>
AuthorDate: Thu Aug 19 08:40:26 2021 +0800

    [#780] Support the RocketMQ TableSink based on the legacy Sink 
implementation (#781)
---
 .../flink/{source => }/common/RocketMQOptions.java |  39 +++-
 .../apache/rocketmq/flink/legacy/RocketMQSink.java |  21 +-
 .../flink/sink/table/RocketMQDynamicTableSink.java | 257 +++++++++++++++++++++
 .../table/RocketMQDynamicTableSinkFactory.java     | 174 ++++++++++++++
 .../flink/sink/table/RocketMQRowDataConverter.java | 217 +++++++++++++++++
 .../flink/sink/table/RocketMQRowDataSink.java      |  61 +++++
 .../table/RocketMQDynamicTableSourceFactory.java   |  32 +--
 .../flink/source/util/StringSerializer.java        |   2 +-
 .../org.apache.flink.table.factories.Factory       |   3 +-
 .../RocketMQDynamicTableSinkFactoryTest.java}      |  56 +++--
 .../RocketMQDynamicTableSourceFactoryTest.java     |   2 +-
 11 files changed, 797 insertions(+), 67 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java 
b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
similarity index 61%
rename from 
src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
rename to src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
index 000e090..b34826d 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/common/RocketMQOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.source.common;
+package org.apache.rocketmq.flink.common;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
@@ -24,19 +24,23 @@ import org.apache.flink.configuration.ConfigOptions;
 /** Includes config options of RocketMQ connector type. */
 public class RocketMQOptions {
 
-    public static final ConfigOption<String> TOPIC = 
ConfigOptions.key("topic").noDefaultValue();
+    public static final ConfigOption<String> TOPIC =
+            ConfigOptions.key("topic").stringType().noDefaultValue();
 
     public static final ConfigOption<String> CONSUMER_GROUP =
-            ConfigOptions.key("consumerGroup").noDefaultValue();
+            ConfigOptions.key("consumerGroup").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> PRODUCER_GROUP =
+            ConfigOptions.key("producerGroup").stringType().noDefaultValue();
 
     public static final ConfigOption<String> NAME_SERVER_ADDRESS =
-            ConfigOptions.key("nameServerAddress").noDefaultValue();
+            
ConfigOptions.key("nameServerAddress").stringType().noDefaultValue();
 
     public static final ConfigOption<String> OPTIONAL_TAG =
-            ConfigOptions.key("tag").noDefaultValue();
+            ConfigOptions.key("tag").stringType().noDefaultValue();
 
     public static final ConfigOption<Integer> OPTIONAL_START_MESSAGE_OFFSET =
-            ConfigOptions.key("startMessageOffset").defaultValue(-1);
+            ConfigOptions.key("startMessageOffset").intType().defaultValue(-1);
 
     public static final ConfigOption<Long> OPTIONAL_START_TIME_MILLS =
             
ConfigOptions.key("startTimeMs".toLowerCase()).longType().defaultValue(-1L);
@@ -45,7 +49,7 @@ public class RocketMQOptions {
             
ConfigOptions.key("startTime".toLowerCase()).stringType().noDefaultValue();
 
     public static final ConfigOption<String> OPTIONAL_END_TIME =
-            ConfigOptions.key("endTime").noDefaultValue();
+            ConfigOptions.key("endTime").stringType().noDefaultValue();
 
     public static final ConfigOption<String> OPTIONAL_TIME_ZONE =
             
ConfigOptions.key("timeZone".toLowerCase()).stringType().noDefaultValue();
@@ -70,4 +74,25 @@ public class RocketMQOptions {
 
     public static final ConfigOption<String> OPTIONAL_LENGTH_CHECK =
             ConfigOptions.key("lengthCheck").stringType().defaultValue("NONE");
+
+    public static final ConfigOption<Integer> OPTIONAL_WRITE_RETRY_TIMES =
+            ConfigOptions.key("retryTimes").intType().defaultValue(10);
+
+    public static final ConfigOption<Long> OPTIONAL_WRITE_SLEEP_TIME_MS =
+            ConfigOptions.key("sleepTimeMs").longType().defaultValue(5000L);
+
+    public static final ConfigOption<Boolean> OPTIONAL_WRITE_IS_DYNAMIC_TAG =
+            
ConfigOptions.key("isDynamicTag").booleanType().defaultValue(false);
+
+    public static final ConfigOption<String> OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN 
=
+            
ConfigOptions.key("dynamicTagColumn").stringType().noDefaultValue();
+
+    public static final ConfigOption<Boolean> 
OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED =
+            
ConfigOptions.key("dynamicTagColumnWriteIncluded").booleanType().defaultValue(true);
+
+    public static final ConfigOption<String> OPTIONAL_WRITE_KEY_COLUMNS =
+            ConfigOptions.key("keyColumns").stringType().noDefaultValue();
+
+    public static final ConfigOption<Boolean> OPTIONAL_WRITE_KEYS_TO_BODY =
+            
ConfigOptions.key("writeKeysToBody").booleanType().defaultValue(false);
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
index b6e1793..328880b 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
@@ -47,7 +47,7 @@ import java.util.UUID;
  * batchFlushOnCheckpoint(true) is set. Otherwise, the sink reliability 
guarantees depends on
  * rocketmq producer's retry policy.
  */
-public class RocketMQSink<IN> extends RichSinkFunction<IN> implements 
CheckpointedFunction {
+public class RocketMQSink extends RichSinkFunction<Message> implements 
CheckpointedFunction {
 
     private static final long serialVersionUID = 1L;
 
@@ -104,12 +104,11 @@ public class RocketMQSink<IN> extends 
RichSinkFunction<IN> implements Checkpoint
     }
 
     @Override
-    public void invoke(IN input, Context context) throws Exception {
+    public void invoke(Message input, Context context) throws Exception {
         sinkInTps.markEvent();
 
-        Message msg = (Message) input;
         if (batchFlushOnCheckpoint) {
-            batchList.add(msg);
+            batchList.add(input);
             if (batchList.size() >= batchSize) {
                 flushSync();
             }
@@ -120,7 +119,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
         if (async) {
             try {
                 producer.send(
-                        msg,
+                        input,
                         new SendCallback() {
                             @Override
                             public void onSuccess(SendResult sendResult) {
@@ -128,7 +127,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
                                 long end = System.currentTimeMillis();
                                 latencyGauge.report(end - timeStartWriting, 1);
                                 outTps.markEvent();
-                                outBps.markEvent(msg.getBody().length);
+                                outBps.markEvent(input.getBody().length);
                             }
 
                             @Override
@@ -143,7 +142,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
             }
         } else {
             try {
-                SendResult result = producer.send(msg);
+                SendResult result = producer.send(input);
                 LOG.debug("Sync send message result: {}", result);
                 if (result.getSendStatus() != SendStatus.SEND_OK) {
                     throw new RemotingException(result.toString());
@@ -151,7 +150,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
                 long end = System.currentTimeMillis();
                 latencyGauge.report(end - timeStartWriting, 1);
                 outTps.markEvent();
-                outBps.markEvent(msg.getBody().length);
+                outBps.markEvent(input.getBody().length);
             } catch (Exception e) {
                 LOG.error("Sync send message exception: ", e);
                 throw e;
@@ -159,17 +158,17 @@ public class RocketMQSink<IN> extends 
RichSinkFunction<IN> implements Checkpoint
         }
     }
 
-    public RocketMQSink<IN> withAsync(boolean async) {
+    public RocketMQSink withAsync(boolean async) {
         this.async = async;
         return this;
     }
 
-    public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean 
batchFlushOnCheckpoint) {
+    public RocketMQSink withBatchFlushOnCheckpoint(boolean 
batchFlushOnCheckpoint) {
         this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
         return this;
     }
 
-    public RocketMQSink<IN> withBatchSize(int batchSize) {
+    public RocketMQSink withBatchSize(int batchSize) {
         this.batchSize = batchSize;
         return this;
     }
diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
new file mode 100644
index 0000000..9839888
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.sink.table;
+
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static 
org.apache.rocketmq.flink.sink.table.RocketMQRowDataConverter.MetadataConverter;
+
+/** Defines the dynamic table sink of RocketMQ. */
+public class RocketMQDynamicTableSink implements DynamicTableSink, 
SupportsWritingMetadata {
+
+    private final DescriptorProperties properties;
+    private final TableSchema schema;
+
+    private final String topic;
+    private final String producerGroup;
+    private final String nameServerAddress;
+    private final String tag;
+    private final String dynamicColumn;
+    private final String fieldDelimiter;
+    private final String encoding;
+
+    private final long retryTimes;
+    private final long sleepTime;
+
+    private final boolean isDynamicTag;
+    private final boolean isDynamicTagIncluded;
+    private final boolean writeKeysToBody;
+
+    private final String[] keyColumns;
+
+    private List<String> metadataKeys;
+
+    public RocketMQDynamicTableSink(
+            DescriptorProperties properties,
+            TableSchema schema,
+            String topic,
+            String producerGroup,
+            String nameServerAddress,
+            String tag,
+            String dynamicColumn,
+            String fieldDelimiter,
+            String encoding,
+            long retryTimes,
+            long sleepTime,
+            boolean isDynamicTag,
+            boolean isDynamicTagIncluded,
+            boolean writeKeysToBody,
+            String[] keyColumns) {
+        this.properties = properties;
+        this.schema = schema;
+        this.topic = topic;
+        this.producerGroup = producerGroup;
+        this.nameServerAddress = nameServerAddress;
+        this.tag = tag;
+        this.dynamicColumn = dynamicColumn;
+        this.fieldDelimiter = fieldDelimiter;
+        this.encoding = encoding;
+        this.retryTimes = retryTimes;
+        this.sleepTime = sleepTime;
+        this.isDynamicTag = isDynamicTag;
+        this.isDynamicTagIncluded = isDynamicTagIncluded;
+        this.writeKeysToBody = writeKeysToBody;
+        this.keyColumns = keyColumns;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+        for (RowKind kind : requestedMode.getContainedKinds()) {
+            if (kind != RowKind.UPDATE_BEFORE) {
+                builder.addContainedKind(kind);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(
+            DynamicTableSink.Context context) {
+        return SinkFunctionProvider.of(new RocketMQRowDataSink(createSink(), 
createConverter()));
+    }
+
+    @Override
+    public Map<String, DataType> listWritableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(WritableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType 
consumedDataType) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        RocketMQDynamicTableSink tableSink =
+                new RocketMQDynamicTableSink(
+                        properties,
+                        schema,
+                        topic,
+                        producerGroup,
+                        nameServerAddress,
+                        tag,
+                        dynamicColumn,
+                        fieldDelimiter,
+                        encoding,
+                        retryTimes,
+                        sleepTime,
+                        isDynamicTag,
+                        isDynamicTagIncluded,
+                        writeKeysToBody,
+                        keyColumns);
+        tableSink.metadataKeys = metadataKeys;
+        return tableSink;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return RocketMQDynamicTableSink.class.getName();
+    }
+
+    private RocketMQSink createSink() {
+        return new RocketMQSink(getProducerProps());
+    }
+
+    private RocketMQRowDataConverter createConverter() {
+        final int[] metadataPositions =
+                Stream.of(WritableMetadata.values())
+                        .mapToInt(
+                                m -> {
+                                    final int pos = 
metadataKeys.indexOf(m.key);
+                                    if (pos < 0) {
+                                        return -1;
+                                    }
+                                    return schema.getFieldCount() + pos;
+                                })
+                        .toArray();
+        return new RocketMQRowDataConverter(
+                topic,
+                tag,
+                dynamicColumn,
+                fieldDelimiter,
+                encoding,
+                isDynamicTag,
+                isDynamicTagIncluded,
+                writeKeysToBody,
+                keyColumns,
+                convertToRowTypeInfo(schema.toRowDataType()),
+                schema.getFieldDataTypes(),
+                metadataKeys.size() > 0,
+                metadataPositions);
+    }
+
+    private Properties getProducerProps() {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, 
producerGroup);
+        producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
nameServerAddress);
+        return producerProps;
+    }
+
+    protected static RowTypeInfo convertToRowTypeInfo(DataType fieldsDataType) 
{
+        final TypeInformation<?>[] fieldTypes =
+                fieldsDataType.getChildren().stream()
+                        .map(LegacyTypeInfoDataTypeConverter::toLegacyTypeInfo)
+                        .toArray(TypeInformation[]::new);
+        return new RowTypeInfo(fieldTypes);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum WritableMetadata {
+        KEYS(
+                "keys",
+                DataTypes.STRING().nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos).toString();
+                    }
+                }),
+
+        TAGS(
+                "tags",
+                DataTypes.STRING().nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos).toString();
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        WritableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
new file mode 100644
index 0000000..72d29d5
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactory.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.sink.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_IS_DYNAMIC_TAG;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_KEYS_TO_BODY;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_KEY_COLUMNS;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_RETRY_TIMES;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_WRITE_SLEEP_TIME_MS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.PRODUCER_GROUP;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC;
+
+/**
+ * Defines the {@link DynamicTableSinkFactory} implementation to create {@link
+ * RocketMQDynamicTableSink}.
+ */
+public class RocketMQDynamicTableSinkFactory implements 
DynamicTableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "rocketmq";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(TOPIC);
+        requiredOptions.add(PRODUCER_GROUP);
+        requiredOptions.add(NAME_SERVER_ADDRESS);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionalOptions = new HashSet<>();
+        optionalOptions.add(OPTIONAL_TAG);
+        optionalOptions.add(OPTIONAL_WRITE_RETRY_TIMES);
+        optionalOptions.add(OPTIONAL_WRITE_SLEEP_TIME_MS);
+        optionalOptions.add(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
+        optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
+        optionalOptions.add(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
+        optionalOptions.add(OPTIONAL_WRITE_KEYS_TO_BODY);
+        optionalOptions.add(OPTIONAL_WRITE_KEY_COLUMNS);
+        optionalOptions.add(OPTIONAL_ENCODING);
+        optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
+        return optionalOptions;
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
+        helper.validate();
+        Map<String, String> rawProperties = 
context.getCatalogTable().getOptions();
+        Configuration properties = Configuration.fromMap(rawProperties);
+        String topicName = properties.getString(TOPIC);
+        String producerGroup = properties.getString(PRODUCER_GROUP);
+        String nameServerAddress = properties.getString(NAME_SERVER_ADDRESS);
+        String tag = properties.getString(OPTIONAL_TAG);
+        String dynamicColumn = 
properties.getString(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN);
+        String encoding = properties.getString(OPTIONAL_ENCODING);
+        String fieldDelimiter = properties.getString(OPTIONAL_FIELD_DELIMITER);
+        int retryTimes = properties.getInteger(OPTIONAL_WRITE_RETRY_TIMES);
+        long sleepTimeMs = properties.getLong(OPTIONAL_WRITE_SLEEP_TIME_MS);
+        boolean isDynamicTag = 
properties.getBoolean(OPTIONAL_WRITE_IS_DYNAMIC_TAG);
+        boolean isDynamicTagIncluded =
+                
properties.getBoolean(OPTIONAL_WRITE_DYNAMIC_TAG_COLUMN_WRITE_INCLUDED);
+        boolean writeKeysToBody = 
properties.getBoolean(OPTIONAL_WRITE_KEYS_TO_BODY);
+        String keyColumnsConfig = 
properties.getString(OPTIONAL_WRITE_KEY_COLUMNS);
+        String[] keyColumns = new String[0];
+        if (keyColumnsConfig != null && keyColumnsConfig.length() > 0) {
+            keyColumns = keyColumnsConfig.split(",");
+        }
+        DescriptorProperties descriptorProperties = new DescriptorProperties();
+        descriptorProperties.putProperties(rawProperties);
+        TableSchema physicalSchema =
+                
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        return new RocketMQDynamicTableSink(
+                descriptorProperties,
+                physicalSchema,
+                topicName,
+                producerGroup,
+                nameServerAddress,
+                tag,
+                dynamicColumn,
+                fieldDelimiter,
+                encoding,
+                sleepTimeMs,
+                retryTimes,
+                isDynamicTag,
+                isDynamicTagIncluded,
+                writeKeysToBody,
+                keyColumns);
+    }
+
+    private void transformContext(
+            DynamicTableFactory factory, DynamicTableFactory.Context context) {
+        Map<String, String> catalogOptions = 
context.getCatalogTable().getOptions();
+        Map<String, String> convertedOptions =
+                normalizeOptionCaseAsFactory(factory, catalogOptions);
+        catalogOptions.clear();
+        for (Map.Entry<String, String> entry : convertedOptions.entrySet()) {
+            catalogOptions.put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private Map<String, String> normalizeOptionCaseAsFactory(
+            Factory factory, Map<String, String> options) {
+        Map<String, String> normalizedOptions = new HashMap<>();
+        Map<String, String> requiredOptionKeysLowerCaseToOriginal =
+                factory.requiredOptions().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        option -> option.key().toLowerCase(), 
ConfigOption::key));
+        Map<String, String> optionalOptionKeysLowerCaseToOriginal =
+                factory.optionalOptions().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        option -> option.key().toLowerCase(), 
ConfigOption::key));
+        for (Map.Entry<String, String> entry : options.entrySet()) {
+            final String catalogOptionKey = entry.getKey();
+            final String catalogOptionValue = entry.getValue();
+            normalizedOptions.put(
+                    requiredOptionKeysLowerCaseToOriginal.containsKey(
+                                    catalogOptionKey.toLowerCase())
+                            ? requiredOptionKeysLowerCaseToOriginal.get(
+                                    catalogOptionKey.toLowerCase())
+                            : 
optionalOptionKeysLowerCaseToOriginal.getOrDefault(
+                                    catalogOptionKey.toLowerCase(), 
catalogOptionKey),
+                    catalogOptionValue);
+        }
+        return normalizedOptions;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java
 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java
new file mode 100644
index 0000000..9c5c8af
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataConverter.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.sink.table;
+
+import org.apache.rocketmq.common.message.Message;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static 
org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata;
+import static 
org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata.KEYS;
+import static 
org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSink.WritableMetadata.TAGS;
+
+/** RocketMQRowDataConverter converts the row data of table to RocketMQ 
message pattern. */
+public class RocketMQRowDataConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQRowDataConverter.class);
+
+    private final String topic;
+    private final String tag;
+    private final String dynamicColumn;
+    private final String fieldDelimiter;
+    private final String encoding;
+
+    private final boolean isDynamicTag;
+    private final boolean isDynamicTagIncluded;
+    private final boolean writeKeysToBody;
+    private boolean onlyVarbinary = false;
+
+    private final String[] keyColumns;
+    private final RowTypeInfo rowTypeInfo;
+    private final DataType[] fieldDataTypes;
+
+    private int[] keyFieldIndexes;
+    private int[] tagFieldIndexes;
+    private int[] bodyFieldIndexes;
+    private DataType[] bodyFieldTypes;
+
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+
+    public RocketMQRowDataConverter(
+            String topic,
+            String tag,
+            String dynamicColumn,
+            String fieldDelimiter,
+            String encoding,
+            boolean isDynamicTag,
+            boolean isDynamicTagIncluded,
+            boolean writeKeysToBody,
+            String[] keyColumns,
+            RowTypeInfo rowTypeInfo,
+            DataType[] fieldDataTypes,
+            boolean hasMetadata,
+            int[] metadataPositions) {
+        this.topic = topic;
+        this.tag = tag;
+        this.dynamicColumn = dynamicColumn;
+        this.fieldDelimiter = fieldDelimiter;
+        this.encoding = encoding;
+        this.isDynamicTag = isDynamicTag;
+        this.isDynamicTagIncluded = isDynamicTagIncluded;
+        this.writeKeysToBody = writeKeysToBody;
+        this.keyColumns = keyColumns;
+        this.rowTypeInfo = rowTypeInfo;
+        this.fieldDataTypes = fieldDataTypes;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+    }
+
+    public void open() {
+        if (rowTypeInfo.getArity() == 1
+                && 
rowTypeInfo.getFieldTypes()[0].getTypeClass().equals(byte[].class)) {
+            onlyVarbinary = true;
+        }
+        Set<Integer> excludedFields = new HashSet<>();
+        if (keyColumns != null) {
+            keyFieldIndexes = new int[keyColumns.length];
+            for (int index = 0; index < keyColumns.length; index++) {
+                int fieldIndex = rowTypeInfo.getFieldIndex(keyColumns[index]);
+                checkState(
+                        fieldIndex >= 0,
+                        String.format(
+                                "[MetaQConverter] Could not find the 
message-key column: %s.",
+                                keyColumns[index]));
+                keyFieldIndexes[index] = fieldIndex;
+                if (!writeKeysToBody) {
+                    excludedFields.add(fieldIndex);
+                }
+            }
+        } else {
+            keyFieldIndexes = new int[0];
+        }
+        if (isDynamicTag && dynamicColumn != null) {
+            tagFieldIndexes = new int[1];
+            int fieldIndex = rowTypeInfo.getFieldIndex(dynamicColumn);
+            checkState(
+                    fieldIndex >= 0,
+                    String.format(
+                            "[MetaQConverter] Could not find the tag column: 
%s.", dynamicColumn));
+            tagFieldIndexes[0] = fieldIndex;
+            if (!isDynamicTagIncluded) {
+                excludedFields.add(fieldIndex);
+            }
+        } else {
+            tagFieldIndexes = new int[0];
+        }
+        bodyFieldIndexes = new int[rowTypeInfo.getArity() - 
excludedFields.size()];
+        bodyFieldTypes = new DataType[rowTypeInfo.getArity() - 
excludedFields.size()];
+        int index = 0;
+        for (int num = 0; num < rowTypeInfo.getArity(); num++) {
+            if (!excludedFields.contains(num)) {
+                bodyFieldIndexes[index] = num;
+                bodyFieldTypes[index++] = fieldDataTypes[num];
+            }
+        }
+    }
+
+    public Message convert(RowData row) {
+        if (row.getRowKind() != RowKind.INSERT && row.getRowKind() != 
RowKind.UPDATE_AFTER) {
+            return null;
+        }
+        Message message = new Message();
+        message.setTopic(topic);
+        List<String> keys = new ArrayList<>();
+        for (int fieldIndex : keyFieldIndexes) {
+            keys.add(row.getString(fieldIndex).toString());
+        }
+        if (keys.size() > 0) {
+            message.setKeys(keys);
+        }
+        if (!isDynamicTag) {
+            if (tag != null && tag.length() > 0) {
+                message.setTags(tag);
+            }
+        } else {
+            checkState(tagFieldIndexes.length > 0, "No message tag column 
set.");
+            message.setTags(row.getString(tagFieldIndexes[0]).toString());
+        }
+        if (onlyVarbinary) {
+            message.setBody(row.getBinary(0));
+            message.setWaitStoreMsgOK(true);
+        } else {
+            Object[] values = new Object[bodyFieldIndexes.length];
+            for (int index = 0; index < bodyFieldIndexes.length; index++) {
+                values[index] =
+                        RowData.createFieldGetter(
+                                        bodyFieldTypes[index].getLogicalType(),
+                                        bodyFieldIndexes[index])
+                                .getFieldOrNull(row);
+            }
+            try {
+                message.setBody(StringUtils.join(values, 
fieldDelimiter).getBytes(encoding));
+                message.setWaitStoreMsgOK(true);
+            } catch (UnsupportedEncodingException e) {
+                LOG.error(
+                        String.format(
+                                "Unsupported ''{%s}'' encoding charset. Check 
the encoding configItem in the DDL.",
+                                encoding),
+                        e);
+            }
+        }
+        if (hasMetadata) {
+            String messageKeys = readMetadata(row, KEYS);
+            message.setKeys(messageKeys);
+            message.setTags(readMetadata(row, TAGS));
+        }
+        return message;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T readMetadata(RowData consumedRow, WritableMetadata metadata) 
{
+        final int pos = metadataPositions[metadata.ordinal()];
+        if (pos < 0) {
+            return null;
+        }
+        return (T) metadata.converter.read(consumedRow, pos);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+        Object read(RowData consumedRow, int pos);
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java
new file mode 100644
index 0000000..ac01829
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQRowDataSink.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * <p>Unless required by applicable law or agreed to in writing, software 
distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing 
permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.sink.table;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.legacy.RocketMQSink;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.data.RowData;
+
+/** RocketMQRowDataSink helps for writing the converted row data of table to 
RocketMQ messages. */
+public class RocketMQRowDataSink extends RichSinkFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RocketMQSink sink;
+    private final RocketMQRowDataConverter converter;
+
+    public RocketMQRowDataSink(RocketMQSink sink, RocketMQRowDataConverter 
converter) {
+        this.sink = sink;
+        this.converter = converter;
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        sink.open(configuration);
+        converter.open();
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext runtimeContext) {
+        sink.setRuntimeContext(runtimeContext);
+    }
+
+    @Override
+    public void invoke(RowData rowData, Context context) throws Exception {
+        Message message = converter.convert(rowData);
+        if (message != null) {
+            sink.invoke(message, context);
+        }
+    }
+
+    @Override
+    public void close() {
+        sink.close();
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 990e28b..36e3dc6 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -42,22 +42,22 @@ import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.CONSUMER_GROUP;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.NAME_SERVER_ADDRESS;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_ENCODING;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_END_TIME;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
-import static 
org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
-import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.CONSUMER_GROUP;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LENGTH_CHECK;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_LINE_DELIMITER;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_MESSAGE_OFFSET;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TAG;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
+import static 
org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
+import static org.apache.rocketmq.flink.common.RocketMQOptions.TOPIC;
 
 /**
  * Defines the {@link DynamicTableSourceFactory} implementation to create 
{@link
diff --git 
a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java 
b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
index c0a67b8..e2bcf22 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/util/StringSerializer.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.flink.source.util;
 
-import java.util.Base64;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -30,6 +29,7 @@ import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Base64;
 import java.util.Set;
 
 /** String serializer. */
diff --git 
a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory 
b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 32de8b2..b164722 100644
--- 
a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
\ No newline at end of file
+org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
+org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
 
b/src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java
similarity index 64%
copy from 
src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
copy to 
src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java
index 184a23f..b06695a 100644
--- 
a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
+++ 
b/src/test/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSinkFactoryTest.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.flink.source.table;
+package org.apache.rocketmq.flink.sink.table;
 
-import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+import org.apache.rocketmq.flink.common.RocketMQOptions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.Schema;
@@ -28,7 +28,7 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import org.junit.Test;
@@ -42,8 +42,8 @@ import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
-public class RocketMQDynamicTableSourceFactoryTest {
+/** Tests for {@link 
org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory}. */
+public class RocketMQDynamicTableSinkFactoryTest {
 
     private static final ResolvedSchema SCHEMA =
             new ResolvedSchema(
@@ -52,60 +52,56 @@ public class RocketMQDynamicTableSourceFactoryTest {
                     null);
 
     private static final String IDENTIFIER = "rocketmq";
-    private static final String TOPIC = "test_source";
-    private static final String CONSUMER_GROUP = "test_consumer";
-    private static final String NAME_SERVER_ADDRESS = "127.0.0.1:9876";
+    private static final String TOPIC = "test_sink";
+    private static final String PRODUCER_GROUP = "test_producer";
+    private static final String NAME_SERVER_ADDRESS =
+            "http://${instanceId}.${region}.mq-internal.aliyuncs.com:8080";;
 
     @Test
-    public void testRocketMQDynamicTableSourceWithLegalOption() {
+    public void testRocketMQDynamicTableSinkWithLegalOption() {
         final Map<String, String> options = new HashMap<>();
         options.put("connector", IDENTIFIER);
         options.put(RocketMQOptions.TOPIC.key(), TOPIC);
-        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP);
         options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), 
NAME_SERVER_ADDRESS);
-        final DynamicTableSource tableSource = createTableSource(options);
-        assertTrue(tableSource instanceof RocketMQScanTableSource);
-        assertEquals(RocketMQScanTableSource.class.getName(), 
tableSource.asSummaryString());
+        final DynamicTableSink tableSink = createDynamicTableSink(options);
+        assertTrue(tableSink instanceof RocketMQDynamicTableSink);
+        assertEquals(RocketMQDynamicTableSink.class.getName(), 
tableSink.asSummaryString());
     }
 
     @Test(expected = ValidationException.class)
-    public void testRocketMQDynamicTableSourceWithoutRequiredOption() {
+    public void testRocketMQDynamicTableSinkWithoutRequiredOption() {
         final Map<String, String> options = new HashMap<>();
         options.put("connector", IDENTIFIER);
         options.put(RocketMQOptions.TOPIC.key(), TOPIC);
-        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP);
         options.put(RocketMQOptions.OPTIONAL_TAG.key(), "test_tag");
-        createTableSource(options);
+        createDynamicTableSink(options);
     }
 
     @Test(expected = ValidationException.class)
-    public void testRocketMQDynamicTableSourceWithUnknownOption() {
+    public void testRocketMQDynamicTableSinkWithUnknownOption() {
         final Map<String, String> options = new HashMap<>();
         options.put(RocketMQOptions.TOPIC.key(), TOPIC);
-        options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+        options.put(RocketMQOptions.PRODUCER_GROUP.key(), PRODUCER_GROUP);
         options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), 
NAME_SERVER_ADDRESS);
         options.put("unknown", "test_option");
-        createTableSource(options);
+        createDynamicTableSink(options);
     }
 
-    private static DynamicTableSource createTableSource(
-            Map<String, String> options, Configuration conf) {
-        return FactoryUtil.createTableSource(
+    private static DynamicTableSink createDynamicTableSink(Map<String, String> 
options) {
+        return FactoryUtil.createTableSink(
                 null,
-                ObjectIdentifier.of("default", "default", IDENTIFIER),
+                ObjectIdentifier.of("default", "default", "mq"),
                 new ResolvedCatalogTable(
                         CatalogTable.of(
                                 
Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
-                                "mock source",
+                                "mock sink",
                                 Collections.emptyList(),
                                 options),
                         SCHEMA),
-                conf,
-                RocketMQDynamicTableSourceFactory.class.getClassLoader(),
+                new Configuration(),
+                RocketMQDynamicTableSinkFactory.class.getClassLoader(),
                 false);
     }
-
-    private static DynamicTableSource createTableSource(Map<String, String> 
options) {
-        return createTableSource(options, new Configuration());
-    }
 }
diff --git 
a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
 
b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
index 184a23f..377e63b 100644
--- 
a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
+++ 
b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.rocketmq.flink.source.table;
 
-import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+import org.apache.rocketmq.flink.common.RocketMQOptions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.Schema;

Reply via email to