This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 37300057 [FLINK-22748] Allow dynamic target topic selection in SQL 
Kafka sinks (#109)
37300057 is described below

commit 37300057d2b59fc88c6483d69980e8e08b6cfde4
Author: klam-shop <[email protected]>
AuthorDate: Fri Sep 6 10:38:20 2024 -0400

    [FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks (#109)
    
    [FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL 
Kafka sinks
    
    Allows writing to different Kafka topics based on the topic metadata column 
value in SQL, and updates the Table API's KafkaDynamicSink to accept a 
List<String> topics instead of String topic as well as support topic-pattern. 
The list acts as an allow-list of acceptable values for the topic metadata 
column. topic-pattern for sinks is a pattern that must match topic metadata 
column values, or else an error is thrown.
    
    If a single topic is provided, it is used by default for the target topic 
to produce to
    If a list is provided, only that list of topics can be produced to
    If a topic pattern is provided, it must match topic metadata column values
---
 docs/content.zh/docs/connectors/table/kafka.md     |   8 +-
 .../docs/connectors/table/upsert-kafka.md          |   2 +-
 docs/content/docs/connectors/table/kafka.md        |   8 +-
 docs/content/docs/connectors/table/upsert-kafka.md |   2 +-
 .../DynamicKafkaRecordSerializationSchema.java     |  70 +++++++--
 .../kafka/table/KafkaConnectorOptions.java         |   6 +-
 .../kafka/table/KafkaConnectorOptionsUtil.java     |  60 +++-----
 .../connectors/kafka/table/KafkaDynamicSink.java   |  49 ++++--
 .../kafka/table/KafkaDynamicTableFactory.java      |  17 ++-
 .../table/UpsertKafkaDynamicTableFactory.java      |  26 ++--
 .../DynamicKafkaRecordSerializationSchemaTest.java | 148 +++++++++++++++++++
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 124 ++++++++++++----
 .../connectors/kafka/table/KafkaTableITCase.java   | 164 +++++++++++++++++++++
 .../table/UpsertKafkaDynamicTableFactoryTest.java  |  96 +++++++++++-
 14 files changed, 648 insertions(+), 132 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/kafka.md 
b/docs/content.zh/docs/connectors/table/kafka.md
index 27c7c9b1..286a922e 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -81,7 +81,7 @@ CREATE TABLE KafkaTable (
       <td><code>topic</code></td>
       <td><code>STRING NOT NULL</code></td>
       <td>Kafka 记录的 Topic 名。</td>
-      <td><code>R</code></td>
+      <td><code>R/W</code></td>
     </tr>
     <tr>
       <td><code>partition</code></td>
@@ -191,17 +191,17 @@ CREATE TABLE KafkaTable (
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required for sink</td>
+      <td>可选</td>
       <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 
<code>'topic-1;topic-2'</code>。注意,对 source 表而言,'topic' 和 'topic-pattern' 
两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。</td>
+      <td>当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 
列表,如 <code>'topic-1;topic-2'</code> 来作为 source 的 topic 
列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 
的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 
表,“topic”元数据列是可写的并且必须指定。</td>
     </tr>
     <tr>
       <td><h5>topic-pattern</h5></td>
       <td>可选</td>
       <td style="word-wrap: break-word;">(无)</td>
       <td>String</td>
-      <td>匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 
订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。</td>
+      <td>用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 
sink 来说,`topic` 元数据列是可写的,必须提供并且与 `topic-pattern` 
正则表达式匹配。注意,“topic-pattern”和“topic”只能指定其中一个。</td>
     </tr>
     <tr>
       <td><h5>properties.bootstrap.servers</h5></td>
diff --git a/docs/content.zh/docs/connectors/table/upsert-kafka.md 
b/docs/content.zh/docs/connectors/table/upsert-kafka.md
index df2c1387..3d28ae56 100644
--- a/docs/content.zh/docs/connectors/table/upsert-kafka.md
+++ b/docs/content.zh/docs/connectors/table/upsert-kafka.md
@@ -119,7 +119,7 @@ of all available metadata fields.
       <td>必选</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>用于读取和写入的 Kafka topic 名称。</td>
+      <td>当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 
列表,如 <code>'topic-1;topic-2'</code> 来作为 source 的 topic 
列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 
的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 
表,“topic”元数据列是可写的并且必须指定。</td>
     </tr>
     <tr>
       <td><h5>properties.bootstrap.servers</h5></td>
diff --git a/docs/content/docs/connectors/table/kafka.md 
b/docs/content/docs/connectors/table/kafka.md
index 55a5dbf2..5756315b 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -83,7 +83,7 @@ Read-only columns must be declared `VIRTUAL` to exclude them 
during an `INSERT I
       <td><code>topic</code></td>
       <td><code>STRING NOT NULL</code></td>
       <td>Topic name of the Kafka record.</td>
-      <td><code>R</code></td>
+      <td><code>R/W</code></td>
     </tr>
     <tr>
       <td><code>partition</code></td>
@@ -196,11 +196,11 @@ Connector Options
     </tr>
     <tr>
       <td><h5>topic</h5></td>
-      <td>required for sink</td>
+      <td>optional</td>
       <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Topic name(s) to read data from when the table is used as source. It 
also supports topic list for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified for sources. When the table is used as sink, the topic name is 
the topic to write data to. Note topic list is not supported for sinks.</td>
+      <td>Topic name(s) to read data from when the table is used as source, or 
topics for writing when the table is used as sink. It also supports topic list 
for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified. For sinks, the topic name is the topic to write data. It also 
supports topic list for sinks. The provided topic-list is treated as a allow 
list of valid values for the `topic` metadata colu [...]
     </tr>
     <tr>
       <td><h5>topic-pattern</h5></td>
@@ -208,7 +208,7 @@ Connector Options
       <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>The regular expression for a pattern of topic names to read from. 
All topics with names that match the specified regular expression will be 
subscribed by the consumer when the job starts running. Note, only one of 
"topic-pattern" and "topic" can be specified for sources.</td>
+      <td>The regular expression for a pattern of topic names to read from or 
write to. All topics with names that match the specified regular expression 
will be subscribed by the consumer when the job starts running. For sinks, the 
`topic` metadata column is writable, must be provided and match the 
`topic-pattern` regex. Note, only one of "topic-pattern" and "topic" can be 
specified.</td>
     </tr>
     <tr>
       <td><h5>properties.bootstrap.servers</h5></td>
diff --git a/docs/content/docs/connectors/table/upsert-kafka.md 
b/docs/content/docs/connectors/table/upsert-kafka.md
index eb662349..e8e38aed 100644
--- a/docs/content/docs/connectors/table/upsert-kafka.md
+++ b/docs/content/docs/connectors/table/upsert-kafka.md
@@ -129,7 +129,7 @@ Connector Options
       <td>required</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>The Kafka topic name to read from and write to.</td>
+      <td>Topic name(s) to read data from when the table is used as source, or 
topics for writing when the table is used as sink. It also supports topic list 
for source by separating topic by semicolon like 
<code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" 
can be specified. For sinks, the topic name is the topic to write data. It also 
supports topic list for sinks. The provided topic-list is treated as a allow 
list of valid values for the `topic` metadata colu [...]
     </tr>
     <tr>
       <td><h5>properties.bootstrap.servers</h5></td>
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index 7908aded..71ca4147 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -30,12 +30,20 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link 
KafkaSink}. */
 class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
 
-    private final String topic;
+    private final Set<String> topics;
+    private final Pattern topicPattern;
     private final FlinkKafkaPartitioner<RowData> partitioner;
     @Nullable private final SerializationSchema<RowData> keySerialization;
     private final SerializationSchema<RowData> valueSerialization;
@@ -44,9 +52,11 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
     private final boolean hasMetadata;
     private final int[] metadataPositions;
     private final boolean upsertMode;
+    private final Map<String, Boolean> topicPatternMatches;
 
     DynamicKafkaRecordSerializationSchema(
-            String topic,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
             @Nullable FlinkKafkaPartitioner<RowData> partitioner,
             @Nullable SerializationSchema<RowData> keySerialization,
             SerializationSchema<RowData> valueSerialization,
@@ -60,7 +70,16 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
                     keySerialization != null && keyFieldGetters.length > 0,
                     "Key must be set in upsert mode for serialization 
schema.");
         }
-        this.topic = checkNotNull(topic);
+        Preconditions.checkArgument(
+                (topics != null && topicPattern == null && topics.size() > 0)
+                        || (topics == null && topicPattern != null),
+                "Either Topic or Topic Pattern must be set.");
+        if (topics != null) {
+            this.topics = new HashSet<>(topics);
+        } else {
+            this.topics = null;
+        }
+        this.topicPattern = topicPattern;
         this.partitioner = partitioner;
         this.keySerialization = keySerialization;
         this.valueSerialization = checkNotNull(valueSerialization);
@@ -69,6 +88,8 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
         this.hasMetadata = hasMetadata;
         this.metadataPositions = metadataPositions;
         this.upsertMode = upsertMode;
+        // Cache results of topic pattern matches to avoid re-evaluating the 
pattern for each record
+        this.topicPatternMatches = new HashMap<>();
     }
 
     @Override
@@ -77,13 +98,15 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
         // shortcut in case no input projection is required
         if (keySerialization == null && !hasMetadata) {
             final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
+            final String targetTopic = getTargetTopic(consumedRow);
             return new ProducerRecord<>(
-                    topic,
+                    targetTopic,
                     extractPartition(
                             consumedRow,
+                            targetTopic,
                             null,
                             valueSerialized,
-                            context.getPartitionsForTopic(topic)),
+                            context.getPartitionsForTopic(targetTopic)),
                     null,
                     valueSerialized);
         }
@@ -115,14 +138,15 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
                             consumedRow, kind, valueFieldGetters);
             valueSerialized = valueSerialization.serialize(valueRow);
         }
-
+        final String targetTopic = getTargetTopic(consumedRow);
         return new ProducerRecord<>(
-                topic,
+                targetTopic,
                 extractPartition(
                         consumedRow,
+                        targetTopic,
                         keySerialized,
                         valueSerialized,
-                        context.getPartitionsForTopic(topic)),
+                        context.getPartitionsForTopic(targetTopic)),
                 readMetadata(consumedRow, 
KafkaDynamicSink.WritableMetadata.TIMESTAMP),
                 keySerialized,
                 valueSerialized,
@@ -144,14 +168,42 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
         valueSerialization.open(context);
     }
 
+    private String getTargetTopic(RowData element) {
+        if (topics != null && topics.size() == 1) {
+            // If topics is a singleton list, we only return the provided 
topic.
+            return topics.stream().findFirst().get();
+        }
+        final String targetTopic = readMetadata(element, 
KafkaDynamicSink.WritableMetadata.TOPIC);
+        if (targetTopic == null) {
+            throw new IllegalArgumentException(
+                    "The topic of the sink record is not valid. Expected a 
single topic but no topic is set.");
+        } else if (topics != null && !topics.contains(targetTopic)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The topic of the sink record is not valid. 
Expected topic to be in: %s but was: %s",
+                            topics, targetTopic));
+        } else if (topicPattern != null && 
!cachedTopicPatternMatch(targetTopic)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The topic of the sink record is not valid. 
Expected topic to match: %s but was: %s",
+                            topicPattern, targetTopic));
+        }
+        return targetTopic;
+    }
+
+    private boolean cachedTopicPatternMatch(String topic) {
+        return topicPatternMatches.computeIfAbsent(topic, t -> 
topicPattern.matcher(t).matches());
+    }
+
     private Integer extractPartition(
             RowData consumedRow,
+            String targetTopic,
             @Nullable byte[] keySerialized,
             byte[] valueSerialized,
             int[] partitions) {
         if (partitioner != null) {
             return partitioner.partition(
-                    consumedRow, keySerialized, valueSerialized, topic, 
partitions);
+                    consumedRow, keySerialized, valueSerialized, targetTopic, 
partitions);
         }
         return null;
     }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
index 81ff13c3..11d3c659 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
@@ -118,15 +118,15 @@ public class KafkaConnectorOptions {
                     .asList()
                     .noDefaultValue()
                     .withDescription(
-                            "Topic names from which the table is read. Either 
'topic' or 'topic-pattern' must be set for source. "
-                                    + "Option 'topic' is required for sink.");
+                            "Topic name(s) to read data from when the table is 
used as source. It also supports topic list for source by separating topic by 
semicolon like 'topic-1;topic-2'. Note, only one of 'topic-pattern' and 'topic' 
can be specified for sources. "
+                                    + "When the table is used as sink, the 
topic name is the topic to write data. It also supports topic list for sinks. 
The provided topic-list is treated as a allow list of valid values for the 
`topic` metadata column. If  a list is provided, for sink table, 'topic' 
metadata column is writable and must be specified.");
 
     public static final ConfigOption<String> TOPIC_PATTERN =
             ConfigOptions.key("topic-pattern")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Optional topic pattern from which the table is 
read for source. Either 'topic' or 'topic-pattern' must be set.");
+                            "Optional topic pattern from which the table is 
read for source, or topic pattern that must match the provided `topic` metadata 
column for sink. Either 'topic' or 'topic-pattern' must be set.");
 
     public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
             ConfigOptions.key("properties.bootstrap.servers")
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index d6390e27..f752276a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -98,23 +98,22 @@ class KafkaConnectorOptionsUtil {
     protected static final String DEBEZIUM_AVRO_CONFLUENT = 
"debezium-avro-confluent";
     private static final List<String> SCHEMA_REGISTRY_FORMATS =
             Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);
-
     // 
--------------------------------------------------------------------------------------------
     // Validation
     // 
--------------------------------------------------------------------------------------------
 
     public static void validateTableSourceOptions(ReadableConfig tableOptions) 
{
-        validateSourceTopic(tableOptions);
+        validateTopic(tableOptions);
         validateScanStartupMode(tableOptions);
         validateScanBoundedMode(tableOptions);
     }
 
     public static void validateTableSinkOptions(ReadableConfig tableOptions) {
-        validateSinkTopic(tableOptions);
+        validateTopic(tableOptions);
         validateSinkPartitioner(tableOptions);
     }
 
-    public static void validateSourceTopic(ReadableConfig tableOptions) {
+    public static void validateTopic(ReadableConfig tableOptions) {
         Optional<List<String>> topic = tableOptions.getOptional(TOPIC);
         Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);
 
@@ -128,23 +127,6 @@ class KafkaConnectorOptionsUtil {
         }
     }
 
-    public static void validateSinkTopic(ReadableConfig tableOptions) {
-        String errorMessageTemp =
-                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
-        if (!isSingleTopic(tableOptions)) {
-            if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
-                throw new ValidationException(
-                        String.format(
-                                errorMessageTemp,
-                                "'topic-pattern'",
-                                tableOptions.get(TOPIC_PATTERN)));
-            } else {
-                throw new ValidationException(
-                        String.format(errorMessageTemp, "'topic'", 
tableOptions.get(TOPIC)));
-            }
-        }
-    }
-
     private static void validateScanStartupMode(ReadableConfig tableOptions) {
         tableOptions
                 .getOptional(SCAN_STARTUP_MODE)
@@ -254,11 +236,11 @@ class KafkaConnectorOptionsUtil {
     // Utilities
     // 
--------------------------------------------------------------------------------------------
 
-    public static List<String> getSourceTopics(ReadableConfig tableOptions) {
+    public static List<String> getTopics(ReadableConfig tableOptions) {
         return tableOptions.getOptional(TOPIC).orElse(null);
     }
 
-    public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
+    public static Pattern getTopicPattern(ReadableConfig tableOptions) {
         return 
tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
     }
 
@@ -636,21 +618,25 @@ class KafkaConnectorOptionsUtil {
     private static Map<String, String> autoCompleteSchemaRegistrySubject(
             Map<String, String> options) {
         Configuration configuration = Configuration.fromMap(options);
-        // the subject autoComplete should only be used in sink, check the 
topic first
-        validateSinkTopic(configuration);
-        final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
-        final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
-        final Optional<String> format = configuration.getOptional(FORMAT);
-        final String topic = configuration.get(TOPIC).get(0);
-
-        if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
-            autoCompleteSubject(configuration, format.get(), topic + "-value");
-        } else if (valueFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
-            autoCompleteSubject(configuration, "value." + valueFormat.get(), 
topic + "-value");
-        }
+        // the subject autoComplete should only be used in sink with a single 
topic, check the topic
+        // option first
+        validateTopic(configuration);
+        if (configuration.contains(TOPIC) && isSingleTopic(configuration)) {
+            final Optional<String> valueFormat = 
configuration.getOptional(VALUE_FORMAT);
+            final Optional<String> keyFormat = 
configuration.getOptional(KEY_FORMAT);
+            final Optional<String> format = configuration.getOptional(FORMAT);
+            final String topic = configuration.get(TOPIC).get(0);
+
+            if (format.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
+                autoCompleteSubject(configuration, format.get(), topic + 
"-value");
+            } else if (valueFormat.isPresent()
+                    && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
+                autoCompleteSubject(configuration, "value." + 
valueFormat.get(), topic + "-value");
+            }
 
-        if (keyFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
-            autoCompleteSubject(configuration, "key." + keyFormat.get(), topic 
+ "-key");
+            if (keyFormat.isPresent() && 
SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
+                autoCompleteSubject(configuration, "key." + keyFormat.get(), 
topic + "-key");
+            }
         }
         return configuration.toMap();
     }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 3f6bc5a2..8ab0f10c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -114,8 +115,11 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
      */
     @Nullable private final String transactionalIdPrefix;
 
-    /** The Kafka topic to write to. */
-    protected final String topic;
+    /** The Kafka topics to allow for producing. */
+    protected final List<String> topics;
+
+    /** The Kafka topic pattern of topics allowed to produce to. */
+    protected final Pattern topicPattern;
 
     /** Properties for the Kafka producer. */
     protected final Properties properties;
@@ -143,7 +147,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
             int[] keyProjection,
             int[] valueProjection,
             @Nullable String keyPrefix,
-            String topic,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
             Properties properties,
             @Nullable FlinkKafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
@@ -166,7 +171,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
         // Mutable attributes
         this.metadataKeys = Collections.emptyList();
         // Kafka-specific attributes
-        this.topic = checkNotNull(topic, "Topic must not be null.");
+        this.topics = topics;
+        this.topicPattern = topicPattern;
         this.properties = checkNotNull(properties, "Properties must not be 
null.");
         this.partitioner = partitioner;
         this.deliveryGuarantee =
@@ -206,7 +212,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         .setKafkaProducerConfig(properties)
                         .setRecordSerializer(
                                 new DynamicKafkaRecordSerializationSchema(
-                                        topic,
+                                        topics,
+                                        topicPattern,
                                         partitioner,
                                         keySerialization,
                                         valueSerialization,
@@ -250,8 +257,13 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
     @Override
     public Map<String, DataType> listWritableMetadata() {
         final Map<String, DataType> metadataMap = new LinkedHashMap<>();
-        Stream.of(WritableMetadata.values())
-                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        for (WritableMetadata m : WritableMetadata.values()) {
+            if (topics != null && topics.size() == 1 && 
WritableMetadata.TOPIC.key.equals(m.key)) {
+                // When `topic` is a singleton list, TOPIC metadata is not 
writable
+                continue;
+            }
+            metadataMap.put(m.key, m.dataType);
+        }
         return metadataMap;
     }
 
@@ -272,7 +284,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         keyProjection,
                         valueProjection,
                         keyPrefix,
-                        topic,
+                        topics,
+                        topicPattern,
                         properties,
                         partitioner,
                         deliveryGuarantee,
@@ -306,7 +319,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                 && Arrays.equals(keyProjection, that.keyProjection)
                 && Arrays.equals(valueProjection, that.valueProjection)
                 && Objects.equals(keyPrefix, that.keyPrefix)
-                && Objects.equals(topic, that.topic)
+                && Objects.equals(topics, that.topics)
+                && Objects.equals(String.valueOf(topicPattern), 
String.valueOf(that.topicPattern))
                 && Objects.equals(properties, that.properties)
                 && Objects.equals(partitioner, that.partitioner)
                 && Objects.equals(deliveryGuarantee, that.deliveryGuarantee)
@@ -327,7 +341,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                topic,
+                topics,
+                topicPattern,
                 properties,
                 partitioner,
                 deliveryGuarantee,
@@ -393,6 +408,20 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
     // 
--------------------------------------------------------------------------------------------
 
     enum WritableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos).toString();
+                    }
+                }),
         HEADERS(
                 "headers",
                 // key and value of the map are nullable to make handling 
easier in queries
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 89dda61a..7c23923b 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -93,9 +93,9 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopicPattern;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
 
@@ -222,8 +222,8 @@ public class KafkaDynamicTableFactory
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                getSourceTopics(tableOptions),
-                getSourceTopicPattern(tableOptions),
+                getTopics(tableOptions),
+                getTopicPattern(tableOptions),
                 properties,
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
@@ -278,7 +278,8 @@ public class KafkaDynamicTableFactory
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                tableOptions.get(TOPIC).get(0),
+                getTopics(tableOptions),
+                getTopicPattern(tableOptions),
                 getKafkaProperties(context.getCatalogTable().getOptions()),
                 getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()).orElse(null),
                 deliveryGuarantee,
@@ -423,7 +424,8 @@ public class KafkaDynamicTableFactory
             int[] keyProjection,
             int[] valueProjection,
             @Nullable String keyPrefix,
-            String topic,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
             Properties properties,
             FlinkKafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
@@ -437,7 +439,8 @@ public class KafkaDynamicTableFactory
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                topic,
+                topics,
+                topicPattern,
                 properties,
                 partitioner,
                 deliveryGuarantee,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index cebe27f2..78debc17 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -66,6 +66,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
@@ -75,8 +76,8 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopicPattern;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getTopics;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.validateScanBoundedMode;
 
 /** Upsert-Kafka factory. */
@@ -94,7 +95,6 @@ public class UpsertKafkaDynamicTableFactory
     public Set<ConfigOption<?>> requiredOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
         options.add(PROPS_BOOTSTRAP_SERVERS);
-        options.add(TOPIC);
         options.add(KEY_FORMAT);
         options.add(VALUE_FORMAT);
         return options;
@@ -103,6 +103,8 @@ public class UpsertKafkaDynamicTableFactory
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(TOPIC);
+        options.add(TOPIC_PATTERN);
         options.add(KEY_FIELDS_PREFIX);
         options.add(VALUE_FIELDS_INCLUDE);
         options.add(SINK_PARALLELISM);
@@ -155,8 +157,8 @@ public class UpsertKafkaDynamicTableFactory
                 keyValueProjections.f0,
                 keyValueProjections.f1,
                 keyPrefix,
-                getSourceTopics(tableOptions),
-                getSourceTopicPattern(tableOptions),
+                getTopics(tableOptions),
+                getTopicPattern(tableOptions),
                 properties,
                 earliest,
                 Collections.emptyMap(),
@@ -212,7 +214,8 @@ public class UpsertKafkaDynamicTableFactory
                 keyValueProjections.f0,
                 keyValueProjections.f1,
                 keyPrefix,
-                tableOptions.get(TOPIC).get(0),
+                getTopics(tableOptions),
+                getTopicPattern(tableOptions),
                 properties,
                 null,
                 tableOptions.get(DELIVERY_GUARANTEE),
@@ -247,7 +250,6 @@ public class UpsertKafkaDynamicTableFactory
             Format keyFormat,
             Format valueFormat,
             int[] primaryKeyIndexes) {
-        validateTopic(tableOptions);
         validateScanBoundedMode(tableOptions);
         validateFormat(keyFormat, valueFormat, tableOptions);
         validatePKConstraints(primaryKeyIndexes);
@@ -258,21 +260,11 @@ public class UpsertKafkaDynamicTableFactory
             Format keyFormat,
             Format valueFormat,
             int[] primaryKeyIndexes) {
-        validateTopic(tableOptions);
         validateFormat(keyFormat, valueFormat, tableOptions);
         validatePKConstraints(primaryKeyIndexes);
         validateSinkBufferFlush(tableOptions);
     }
 
-    private static void validateTopic(ReadableConfig tableOptions) {
-        List<String> topic = tableOptions.get(TOPIC);
-        if (topic.size() > 1) {
-            throw new ValidationException(
-                    "The 'upsert-kafka' connector doesn't support topic list 
now. "
-                            + "Please use single topic as the value of the 
parameter 'topic'.");
-        }
-    }
-
     private static void validateFormat(
             Format keyFormat, Format valueFormat, ReadableConfig tableOptions) 
{
         if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java
new file mode 100644
index 00000000..6371ae5e
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchemaTest.java
@@ -0,0 +1,148 @@
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DynamicKafkaRecordSerializationSchema}. */
+public class DynamicKafkaRecordSerializationSchemaTest extends TestLogger {
+    private static final List<String> MULTIPLE_TOPICS = 
Arrays.asList("topic1", "topic2");
+    private static final String SINGLE_TOPIC = "topic";
+    private static final Pattern TOPIC_PATTERN = Pattern.compile("topic*");
+
+    @ParameterizedTest
+    @MethodSource("provideTopicMetadataTestParameters")
+    public void testTopicMetadata(
+            List<String> topics, Pattern topicPattern, String rowTopic, String 
expectedTopic) {
+        GenericRowData rowData = createRowData(rowTopic);
+        DynamicKafkaRecordSerializationSchema schema = createSchema(topics, 
topicPattern);
+        KafkaRecordSerializationSchema.KafkaSinkContext context = 
createContext();
+
+        // Call serialize method
+        ProducerRecord<byte[], byte[]> record = schema.serialize(rowData, 
context, null);
+
+        // Assert the returned ProducerRecord is routed to the correct topic
+        assertThat(record.topic()).isEqualTo(expectedTopic);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideInvalidTopicMetadataTestParameters")
+    public void testInvalidTopicMetadata(
+            List<String> topics, Pattern topicPattern, String rowTopic, String 
expectedError) {
+        GenericRowData rowData = createRowData(rowTopic);
+        DynamicKafkaRecordSerializationSchema schema = createSchema(topics, 
topicPattern);
+        KafkaRecordSerializationSchema.KafkaSinkContext context = 
createContext();
+
+        // Call serialize method
+        assertThatThrownBy(() -> schema.serialize(rowData, context, null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(expectedError);
+    }
+
+    private static Stream<Arguments> provideTopicMetadataTestParameters() {
+        String topic1 = "topic1";
+        return Stream.of(
+                Arguments.of(
+                        Collections.singletonList(SINGLE_TOPIC), null, 
SINGLE_TOPIC, SINGLE_TOPIC),
+                Arguments.of(Collections.singletonList(SINGLE_TOPIC), null, 
topic1, SINGLE_TOPIC),
+                Arguments.of(Collections.singletonList(SINGLE_TOPIC), null, 
null, SINGLE_TOPIC),
+                Arguments.of(MULTIPLE_TOPICS, null, topic1, topic1),
+                Arguments.of(null, TOPIC_PATTERN, SINGLE_TOPIC, SINGLE_TOPIC));
+    }
+
+    private static Stream<Arguments> 
provideInvalidTopicMetadataTestParameters() {
+        String other = "other";
+        return Stream.of(
+                Arguments.of(
+                        MULTIPLE_TOPICS,
+                        null,
+                        other,
+                        String.format(
+                                "The topic of the sink record is not valid. 
Expected topic to be in: %s but was: %s",
+                                MULTIPLE_TOPICS, other)),
+                Arguments.of(
+                        null,
+                        TOPIC_PATTERN,
+                        other,
+                        String.format(
+                                "The topic of the sink record is not valid. 
Expected topic to match: %s but was: %s",
+                                "topic*", other)));
+    }
+
+    private DynamicKafkaRecordSerializationSchema createSchema(
+            List<String> topics, Pattern topicPattern) {
+        // Create a SerializationSchema for RowData
+        SerializationSchema<RowData> serializationSchema =
+                new SerializationSchema<RowData>() {
+                    @Override
+                    public byte[] serialize(RowData element) {
+                        return ((StringData) element.getString(0)).toBytes();
+                    }
+
+                    @Override
+                    public void open(InitializationContext context) throws 
Exception {}
+                };
+
+        int[] metadataPositions = new int[3];
+        metadataPositions[KafkaDynamicSink.WritableMetadata.TOPIC.ordinal()] = 
1;
+        metadataPositions[KafkaDynamicSink.WritableMetadata.HEADERS.ordinal()] 
= 2;
+        
metadataPositions[KafkaDynamicSink.WritableMetadata.TIMESTAMP.ordinal()] = 3;
+
+        return new DynamicKafkaRecordSerializationSchema(
+                topics,
+                topicPattern,
+                null,
+                null,
+                serializationSchema,
+                new RowData.FieldGetter[] {r -> r.getString(0)},
+                new RowData.FieldGetter[] {r -> r.getString(0)},
+                true,
+                metadataPositions,
+                false);
+    }
+
+    private GenericRowData createRowData(String topic) {
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, StringData.fromString("test"));
+        rowData.setField(1, StringData.fromString(topic));
+        rowData.setField(2, null);
+        rowData.setField(3, null);
+        return rowData;
+    }
+
+    private KafkaRecordSerializationSchema.KafkaSinkContext createContext() {
+        return new KafkaRecordSerializationSchema.KafkaSinkContext() {
+            @Override
+            public int getParallelInstanceId() {
+                return 0;
+            }
+
+            @Override
+            public int getNumberOfParallelInstances() {
+                return 1;
+            }
+
+            @Override
+            public int[] getPartitionsForTopic(String topic) {
+                return new int[] {0};
+            }
+        };
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 1246d53a..c1d796d0 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -600,7 +600,8 @@ public class KafkaDynamicTableFactoryTest {
                         new int[0],
                         new int[] {0, 1, 2},
                         null,
-                        TOPIC,
+                        Collections.singletonList(TOPIC),
+                        null,
                         KAFKA_SINK_PROPERTIES,
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
@@ -616,6 +617,10 @@ public class KafkaDynamicTableFactoryTest {
         final SinkV2Provider sinkProvider = (SinkV2Provider) provider;
         final Sink<RowData> sinkFunction = sinkProvider.createSink();
         assertThat(sinkFunction).isInstanceOf(KafkaSink.class);
+        assertThat(actualKafkaSink.listWritableMetadata())
+                .containsOnlyKeys(
+                        KafkaDynamicSink.WritableMetadata.HEADERS.key,
+                        KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
     }
 
     @Test
@@ -640,7 +645,8 @@ public class KafkaDynamicTableFactoryTest {
                             new int[0],
                             new int[] {0, 1, 2},
                             null,
-                            TOPIC,
+                            Collections.singletonList(TOPIC),
+                            null,
                             KAFKA_SINK_PROPERTIES,
                             new FlinkFixedPartitioner<>(),
                             
DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")),
@@ -683,7 +689,8 @@ public class KafkaDynamicTableFactoryTest {
                         new int[] {0},
                         new int[] {1, 2},
                         null,
-                        TOPIC,
+                        Collections.singletonList(TOPIC),
+                        null,
                         KAFKA_FINAL_SINK_PROPERTIES,
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
@@ -711,7 +718,8 @@ public class KafkaDynamicTableFactoryTest {
                         new int[0],
                         new int[] {0, 1, 2},
                         null,
-                        TOPIC,
+                        Collections.singletonList(TOPIC),
+                        null,
                         KAFKA_SINK_PROPERTIES,
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
@@ -805,6 +813,77 @@ public class KafkaDynamicTableFactoryTest {
                 "sub2");
     }
 
+    @Test
+    public void testTableSinkWithTopicList() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(getBasicSinkOptions(), options -> 
options.put("topic", TOPICS));
+        KafkaDynamicSink actualSink = (KafkaDynamicSink) 
createTableSink(SCHEMA, modifiedOptions);
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                new EncodingFormatMock(",");
+
+        final DynamicTableSink expectedSink =
+                createExpectedSink(
+                        SCHEMA_DATA_TYPE,
+                        null,
+                        valueEncodingFormat,
+                        new int[0],
+                        new int[] {0, 1, 2},
+                        null,
+                        Arrays.asList(TOPICS.split(";")),
+                        null,
+                        KAFKA_SINK_PROPERTIES,
+                        new FlinkFixedPartitioner<>(),
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        null,
+                        "kafka-sink");
+        assertThat(actualSink).isEqualTo(expectedSink);
+        final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
+        assertThat(actualKafkaSink.listWritableMetadata())
+                .containsOnlyKeys(
+                        KafkaDynamicSink.WritableMetadata.TOPIC.key,
+                        KafkaDynamicSink.WritableMetadata.HEADERS.key,
+                        KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
+    }
+
+    @Test
+    public void testTableSinkWithTopicPattern() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSinkOptions(),
+                        options -> {
+                            options.remove("topic");
+                            options.put("topic-pattern", TOPIC_REGEX);
+                        });
+        KafkaDynamicSink actualSink = (KafkaDynamicSink) 
createTableSink(SCHEMA, modifiedOptions);
+
+        final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
+                new EncodingFormatMock(",");
+
+        final DynamicTableSink expectedSink =
+                createExpectedSink(
+                        SCHEMA_DATA_TYPE,
+                        null,
+                        valueEncodingFormat,
+                        new int[0],
+                        new int[] {0, 1, 2},
+                        null,
+                        null,
+                        Pattern.compile(TOPIC_REGEX),
+                        KAFKA_SINK_PROPERTIES,
+                        new FlinkFixedPartitioner<>(),
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        null,
+                        "kafka-sink");
+        assertThat(actualSink).isEqualTo(expectedSink);
+        final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
+        assertThat(actualKafkaSink.listWritableMetadata())
+                .containsOnlyKeys(
+                        KafkaDynamicSink.WritableMetadata.TOPIC.key,
+                        KafkaDynamicSink.WritableMetadata.HEADERS.key,
+                        KafkaDynamicSink.WritableMetadata.TIMESTAMP.key);
+    }
+
     private void verifyEncoderSubject(
             Consumer<Map<String, String>> optionModifier,
             String expectedValueSubject,
@@ -1002,7 +1081,7 @@ public class KafkaDynamicTableFactoryTest {
     }
 
     @Test
-    public void testSinkWithTopicListOrTopicPattern() {
+    public void testSinkWithTopicListAndTopicPattern() {
         Map<String, String> modifiedOptions =
                 getModifiedOptions(
                         getBasicSinkOptions(),
@@ -1010,32 +1089,13 @@ public class KafkaDynamicTableFactoryTest {
                             options.put("topic", TOPICS);
                             options.put("scan.startup.mode", 
"earliest-offset");
                             options.remove("specific-offsets");
+                            options.put("topic-pattern", TOPIC_REGEX);
                         });
         final String errorMessageTemp =
-                "Flink Kafka sink currently only supports single topic, but 
got %s: %s.";
-
-        try {
-            createTableSink(SCHEMA, modifiedOptions);
-        } catch (Throwable t) {
-            assertThat(t.getCause().getMessage())
-                    .isEqualTo(
-                            String.format(
-                                    errorMessageTemp,
-                                    "'topic'",
-                                    String.format("[%s]", String.join(", ", 
TOPIC_LIST))));
-        }
-
-        modifiedOptions =
-                getModifiedOptions(
-                        getBasicSinkOptions(),
-                        options -> options.put("topic-pattern", TOPIC_REGEX));
-
-        try {
-            createTableSink(SCHEMA, modifiedOptions);
-        } catch (Throwable t) {
-            assertThat(t.getCause().getMessage())
-                    .isEqualTo(String.format(errorMessageTemp, 
"'topic-pattern'", TOPIC_REGEX));
-        }
+                "Option 'topic' and 'topic-pattern' shouldn't be set 
together.";
+        assertThatThrownBy(() -> createTableSink(SCHEMA, modifiedOptions))
+                .isInstanceOf(ValidationException.class)
+                .satisfies(anyCauseMatches(ValidationException.class, 
errorMessageTemp));
     }
 
     @Test
@@ -1217,7 +1277,8 @@ public class KafkaDynamicTableFactoryTest {
             int[] keyProjection,
             int[] valueProjection,
             @Nullable String keyPrefix,
-            String topic,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
             Properties properties,
             @Nullable FlinkKafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
@@ -1231,7 +1292,8 @@ public class KafkaDynamicTableFactoryTest {
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                topic,
+                topics,
+                topicPattern,
                 properties,
                 partitioner,
                 deliveryGuarantee,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 409acd97..15aa722f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -188,6 +188,170 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testKafkaSourceSinkWithTopicList() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
+        final String topic2 = "topics2_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic2, 1, 1);
+        createTestTopic(topic1, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+        final String createTableTemplate =
+                "CREATE TABLE %s (\n"
+                        + "  `topic` STRING METADATA,\n"
+                        + "  `user_id` INT,\n"
+                        + "  `item_id` INT,\n"
+                        + "  `behavior` STRING\n"
+                        + ") WITH (\n"
+                        + "  'connector' = '%s',\n"
+                        + "  'topic' = '%s',\n"
+                        + "  'properties.bootstrap.servers' = '%s',\n"
+                        + "  'properties.group.id' = '%s',\n"
+                        + "  'scan.startup.mode' = 'earliest-offset',\n"
+                        + "  'scan.bounded.mode' = 'latest-offset',\n"
+                        + "  %s\n"
+                        + ")\n";
+        final String createTopicListTable =
+                String.format(
+                        createTableTemplate,
+                        "kafka",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        String.join(";", Arrays.asList(topic1, topic2)),
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+        final String createTopic1Table =
+                String.format(
+                        createTableTemplate,
+                        "topic1",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic1,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+        final String createTopic2Table =
+                String.format(
+                        createTableTemplate,
+                        "topic2",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic2,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+
+        tEnv.executeSql(createTopicListTable);
+        tEnv.executeSql(createTopic1Table);
+        tEnv.executeSql(createTopic2Table);
+
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
+        List<Row> topic1Results = collectAllRows(tEnv.sqlQuery("SELECT * from 
topic1"));
+        List<Row> topic2Results = collectAllRows(tEnv.sqlQuery("SELECT * from 
topic2"));
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));
+        assertThat(topic1Results).containsExactly(Row.of(topic1, 1, 1102, 
"behavior 1"));
+        assertThat(topic2Results).containsExactly(Row.of(topic2, 2, 1103, 
"behavior 2"));
+
+        // ------------- cleanup -------------------
+        deleteTestTopic(topic1);
+        deleteTestTopic(topic2);
+    }
+
+    @Test
+    public void testKafkaSourceSinkWithTopicPattern() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic1 = "topics1_" + format + "_" + UUID.randomUUID();
+        final String topic2 = "topics2_" + format + "_" + UUID.randomUUID();
+        final String topicPattern = "topics.*";
+        createTestTopic(topic2, 1, 1);
+        createTestTopic(topic1, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+        final String createTableTemplate =
+                "CREATE TABLE %s (\n"
+                        + "  `topic` STRING METADATA,\n"
+                        + "  `user_id` INT,\n"
+                        + "  `item_id` INT,\n"
+                        + "  `behavior` STRING\n"
+                        + ") WITH (\n"
+                        + "  'connector' = '%s',\n"
+                        + "  'topic-pattern' = '%s',\n"
+                        + "  'properties.bootstrap.servers' = '%s',\n"
+                        + "  'properties.group.id' = '%s',\n"
+                        + "  'scan.startup.mode' = 'earliest-offset',\n"
+                        + "  'scan.bounded.mode' = 'latest-offset',\n"
+                        + "  %s\n"
+                        + ")\n";
+        final String createTopicPatternTable =
+                String.format(
+                        createTableTemplate,
+                        "kafka",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topicPattern,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+        final String createTopic1Table =
+                String.format(
+                        createTableTemplate,
+                        "topic1",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic1,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+        final String createTopic2Table =
+                String.format(
+                        createTableTemplate,
+                        "topic2",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic2,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+
+        tEnv.executeSql(createTopicPatternTable);
+        tEnv.executeSql(createTopic1Table);
+        tEnv.executeSql(createTopic2Table);
+
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
+        List<Row> topic1Results = collectAllRows(tEnv.sqlQuery("SELECT * from 
topic1"));
+        List<Row> topic2Results = collectAllRows(tEnv.sqlQuery("SELECT * from 
topic2"));
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        Row.of(topic1, 1, 1102, "behavior 1"),
+                        Row.of(topic2, 2, 1103, "behavior 2"));
+        assertThat(topic1Results).containsExactly(Row.of(topic1, 1, 1102, 
"behavior 1"));
+        assertThat(topic2Results).containsExactly(Row.of(topic2, 2, 1103, 
"behavior 2"));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic1);
+        deleteTestTopic(topic2);
+    }
+
     @Test
     public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws 
Exception {
         // we always use a different topic name for each parameterized topic,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 15c740d2..1bcd775a 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -77,10 +77,12 @@ import org.junit.rules.ExpectedException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
@@ -165,7 +167,38 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         SOURCE_KEY_FIELDS,
                         SOURCE_VALUE_FIELDS,
                         null,
-                        SOURCE_TOPIC,
+                        Collections.singletonList(SOURCE_TOPIC),
+                        UPSERT_KAFKA_SOURCE_PROPERTIES);
+        assertThat(actualSource).isEqualTo(expectedSource);
+
+        final KafkaDynamicSource actualUpsertKafkaSource = 
(KafkaDynamicSource) actualSource;
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertKafkaSource(provider);
+    }
+
+    @Test
+    public void testTableSourceWithTopicList() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getFullSourceOptions(),
+                        options -> {
+                            options.put(
+                                    "topic", String.format("%s;%s", 
SOURCE_TOPIC, SOURCE_TOPIC));
+                        });
+        final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+        // Construct table source using options and table source factory
+        final DynamicTableSource actualSource = 
createTableSource(SOURCE_SCHEMA, modifiedOptions);
+
+        final KafkaDynamicSource expectedSource =
+                createExpectedScanSource(
+                        producedDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        SOURCE_KEY_FIELDS,
+                        SOURCE_VALUE_FIELDS,
+                        null,
+                        Arrays.asList(SOURCE_TOPIC, SOURCE_TOPIC),
                         UPSERT_KAFKA_SOURCE_PROPERTIES);
         assertThat(actualSource).isEqualTo(expectedSource);
 
@@ -195,7 +228,50 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         SINK_KEY_FIELDS,
                         SINK_VALUE_FIELDS,
                         null,
-                        SINK_TOPIC,
+                        Collections.singletonList(SINK_TOPIC),
+                        null,
+                        UPSERT_KAFKA_SINK_PROPERTIES,
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        SinkBufferFlushMode.DISABLED,
+                        null,
+                        "kafka-sink");
+
+        // Test sink format.
+        final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) 
actualSink;
+        assertThat(actualSink).isEqualTo(expectedSink);
+
+        // Test kafka producer.
+        DynamicTableSink.SinkRuntimeProvider provider =
+                actualUpsertKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+        assertThat(provider).isInstanceOf(SinkV2Provider.class);
+        final SinkV2Provider sinkFunctionProvider = (SinkV2Provider) provider;
+        final Sink<RowData> sink = sinkFunctionProvider.createSink();
+        assertThat(sink).isInstanceOf(KafkaSink.class);
+    }
+
+    @Test
+    public void testTableSinkWithTopicList() {
+        // Construct table sink using options and table sink factory.
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getFullSinkOptions(),
+                        options -> {
+                            options.put("sink.delivery-guarantee", 
"exactly-once");
+                            options.put("sink.transactional-id-prefix", 
"kafka-sink");
+                            options.put("topic", String.format("%s;%s", 
SINK_TOPIC, SINK_TOPIC));
+                        });
+        final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, 
modifiedOptions);
+
+        final DynamicTableSink expectedSink =
+                createExpectedSink(
+                        SINK_SCHEMA.toPhysicalRowDataType(),
+                        keyEncodingFormat,
+                        valueEncodingFormat,
+                        SINK_KEY_FIELDS,
+                        SINK_VALUE_FIELDS,
+                        null,
+                        Arrays.asList(SINK_TOPIC, SINK_TOPIC),
+                        null,
                         UPSERT_KAFKA_SINK_PROPERTIES,
                         DeliveryGuarantee.EXACTLY_ONCE,
                         SinkBufferFlushMode.DISABLED,
@@ -239,7 +315,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         SINK_KEY_FIELDS,
                         SINK_VALUE_FIELDS,
                         null,
-                        SINK_TOPIC,
+                        Collections.singletonList(SINK_TOPIC),
+                        null,
                         UPSERT_KAFKA_SINK_PROPERTIES,
                         DeliveryGuarantee.EXACTLY_ONCE,
                         new SinkBufferFlushMode(100, 1000L),
@@ -293,7 +370,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         SINK_KEY_FIELDS,
                         SINK_VALUE_FIELDS,
                         null,
-                        SINK_TOPIC,
+                        Collections.singletonList(SINK_TOPIC),
+                        null,
                         UPSERT_KAFKA_SINK_PROPERTIES,
                         DeliveryGuarantee.EXACTLY_ONCE,
                         SinkBufferFlushMode.DISABLED,
@@ -772,7 +850,7 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
             int[] keyFields,
             int[] valueFields,
             String keyPrefix,
-            String topic,
+            List<String> topic,
             Properties properties) {
         return new KafkaDynamicSource(
                 producedDataType,
@@ -781,7 +859,7 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                 keyFields,
                 valueFields,
                 keyPrefix,
-                Collections.singletonList(topic),
+                topic,
                 null,
                 properties,
                 StartupMode.EARLIEST,
@@ -801,7 +879,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
             int[] keyProjection,
             int[] valueProjection,
             String keyPrefix,
-            String topic,
+            List<String> topics,
+            Pattern topicPattern,
             Properties properties,
             DeliveryGuarantee deliveryGuarantee,
             SinkBufferFlushMode flushMode,
@@ -815,7 +894,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                 keyProjection,
                 valueProjection,
                 keyPrefix,
-                topic,
+                topics,
+                topicPattern,
                 properties,
                 null,
                 deliveryGuarantee,

Reply via email to