This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 12d4d4b8 [FLINK-39037] support query kafka cluster id as metadata in 
records of DynamicKafkaTableSource (#222)
12d4d4b8 is described below

commit 12d4d4b8bd8ca1af07922ec6d2b34254a44cb2af
Author: bowenli86 <[email protected]>
AuthorDate: Wed Feb 18 13:36:28 2026 -0800

    [FLINK-39037] support query kafka cluster id as metadata in records of 
DynamicKafkaTableSource (#222)
    
    # Motivation
    
    Kafka cluster id is an essential constant to query in context of multi 
clusters of dynamic kafka source. ConsumerRecord includes 
topic/partition/offset but not Kafka cluster id. This PR adds an opt-in 
side-channel to inject cluster id into deserialization so SQL metadata can 
expose kafka_cluster correctly in dynamic multi-cluster reads.
    
    # What Changed
    
    - Added a new internal deserializer hook, Updated dynamic source reader flow
    - Extended table deserialization path
    - Updated table source metadata wiring
    - Added/updated tests:
      - Added DynamicKafkaSourceClusterMetadataITCase.
      - Added DynamicKafkaDeserializationSchemaTest.
      - Updated DynamicKafkaTableITCase with cluster metadata coverage.
    - Added docs
---
 .../docs/connectors/table/dynamic-kafka.md         |  30 +++-
 .../content/docs/connectors/table/dynamic-kafka.md |  32 +++-
 .../source/reader/DynamicKafkaSourceReader.java    |  46 +++++-
 .../reader/KafkaClusterAwareDeserializer.java      |  34 ++++
 .../table/DynamicKafkaDeserializationSchema.java   |  74 ++++++++-
 .../kafka/table/DynamicKafkaTableSource.java       |  43 ++++--
 .../DynamicKafkaSourceClusterMetadataITCase.java   | 172 +++++++++++++++++++++
 .../DynamicKafkaDeserializationSchemaTest.java     | 139 +++++++++++++++++
 .../kafka/table/DynamicKafkaTableITCase.java       | 160 +++++++++++++++++++
 9 files changed, 709 insertions(+), 21 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
index 39ba2871..c6f11778 100644
--- a/docs/content.zh/docs/connectors/table/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
@@ -50,6 +50,7 @@ CREATE TABLE DynamicKafkaTable (
   `user_id` BIGINT,
   `item_id` BIGINT,
   `behavior` STRING,
+  `kafka_cluster` STRING METADATA FROM 'kafka_cluster',
   `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
 ) WITH (
   'connector' = 'dynamic-kafka',
@@ -70,8 +71,33 @@ CREATE TABLE DynamicKafkaTable (
 可用元数据
 --------
 
-Dynamic Kafka 连接器暴露的元数据列与 Kafka 连接器一致。
-请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka" 
>}}#available-metadata)。
+Dynamic Kafka 连接器支持 Kafka 连接器的全部元数据列,并额外提供一个
+Dynamic Source 特有的元数据列:
+
+* `kafka_cluster`(`STRING NOT NULL`,只读):该记录对应的 Kafka 集群 ID,由元数据服务解析并注入。
+
+其余通用元数据列请参考
+[Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka" >}}#available-metadata)。
+
+示例:
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `kafka_cluster` STRING METADATA FROM 'kafka_cluster' VIRTUAL
+) WITH (
+  'connector' = 'dynamic-kafka',
+  'stream-ids' = 'user_behavior;user_behavior_v2',
+  'metadata-service' = 'single-cluster',
+  'metadata-service.cluster-id' = 'cluster-0',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'earliest-offset',
+  'format' = 'csv'
+);
+```
 
 连接器参数
 ---------
diff --git a/docs/content/docs/connectors/table/dynamic-kafka.md 
b/docs/content/docs/connectors/table/dynamic-kafka.md
index 52c7b8c6..cd9a5e2a 100644
--- a/docs/content/docs/connectors/table/dynamic-kafka.md
+++ b/docs/content/docs/connectors/table/dynamic-kafka.md
@@ -52,6 +52,7 @@ CREATE TABLE DynamicKafkaTable (
   `user_id` BIGINT,
   `item_id` BIGINT,
   `behavior` STRING,
+  `kafka_cluster` STRING METADATA FROM 'kafka_cluster',
   `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
 ) WITH (
   'connector' = 'dynamic-kafka',
@@ -73,9 +74,34 @@ properties (all `properties.*` options) into the constructor 
when available.
 Available Metadata
 ------------------
 
-The Dynamic Kafka connector exposes the same metadata columns as the Kafka 
connector.
-See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka" 
>}}#available-metadata) for
-the full list.
+The Dynamic Kafka connector exposes all metadata columns from the Kafka 
connector and adds one
+dynamic-source-specific metadata column:
+
+* `kafka_cluster` (`STRING NOT NULL`, read-only): cluster id resolved by the 
metadata service for
+  the record.
+
+See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka" 
>}}#available-metadata) for the
+shared metadata columns.
+
+Example:
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `kafka_cluster` STRING METADATA FROM 'kafka_cluster' VIRTUAL
+) WITH (
+  'connector' = 'dynamic-kafka',
+  'stream-ids' = 'user_behavior;user_behavior_v2',
+  'metadata-service' = 'single-cluster',
+  'metadata-service.cluster-id' = 'cluster-0',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'earliest-offset',
+  'format' = 'csv'
+);
+```
 
 Connector Options
 ----------------
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 3e6f0534..660342df 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -47,6 +47,7 @@ import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.UserCodeClassLoader;
 
@@ -446,7 +447,9 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
         KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
                 new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);
 
-        deserializationSchema.open(
+        KafkaRecordDeserializationSchema<T> readerSchema =
+                createClusterDeserializationSchema(kafkaClusterId);
+        readerSchema.open(
                 new DeserializationSchema.InitializationContext() {
                     @Override
                     public MetricGroup getMetricGroup() {
@@ -463,7 +466,7 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                     }
                 });
 
-        KafkaRecordEmitter<T> recordEmitter = new 
KafkaRecordEmitter<>(deserializationSchema);
+        KafkaRecordEmitter<T> recordEmitter = new 
KafkaRecordEmitter<>(readerSchema);
         return new KafkaSourceReader<>(
                 elementsQueue,
                 new KafkaSourceFetcherManager(
@@ -481,6 +484,45 @@ public class DynamicKafkaSourceReader<T> implements 
SourceReader<T, DynamicKafka
                 kafkaSourceReaderMetrics);
     }
 
+    private KafkaRecordDeserializationSchema<T> 
createClusterDeserializationSchema(
+            String kafkaClusterId) {
+        /*
+         * Only deserializers that explicitly opt in to cluster metadata 
should be
+         * wrapped with a per-cluster instance. This keeps the default path 
identical
+         * to the static source, and avoids cloning when the schema is 
cluster-agnostic.
+         */
+        if (!(deserializationSchema instanceof KafkaClusterAwareDeserializer)) 
{
+            return deserializationSchema;
+        }
+
+        KafkaClusterAwareDeserializer clusterAware =
+                (KafkaClusterAwareDeserializer) deserializationSchema;
+        /*
+         * The opt-in flag lets a deserializer declare whether it needs the 
cluster id
+         * injected at all (e.g., table deserializers can decide based on 
metadata mask).
+         */
+        if (!clusterAware.needsKafkaClusterId()) {
+            return deserializationSchema;
+        }
+
+        try {
+            /*
+             * Clone the schema to avoid sharing mutable state across clusters.
+             * The cloned instance is then bound to the cluster id before use.
+             */
+            KafkaRecordDeserializationSchema<T> readerSchema =
+                    InstantiationUtil.clone(
+                            deserializationSchema,
+                            
readerContext.getUserCodeClassLoader().asClassLoader());
+            ((KafkaClusterAwareDeserializer) 
readerSchema).setKafkaClusterId(kafkaClusterId);
+            return readerSchema;
+        } catch (Exception error) {
+            throw new IllegalStateException(
+                    "Failed to clone Kafka deserialization schema for dynamic 
cluster metadata.",
+                    error);
+        }
+    }
+
     /**
      * In metadata change, we need to reset the availability helper since the 
number of Kafka source
      * readers could have changed.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
new file mode 100644
index 00000000..bbbd2e87
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.reader;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+/** Optional hook for deserializers that need access to the Kafka cluster id. 
*/
+@Internal
+public interface KafkaClusterAwareDeserializer {
+
+    default boolean needsKafkaClusterId() {
+        return false;
+    }
+
+    void setKafkaClusterId(@Nullable String kafkaClusterId);
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
index 2fa67bd4..939d9efd 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.kafka.dynamic.source.reader.KafkaClusterAwareDeserializer;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
@@ -40,7 +42,8 @@ import java.util.List;
 
 /** A specific {@link KafkaRecordDeserializationSchema} for {@link 
KafkaDynamicSource}. */
 @Internal
-class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSchema<RowData> {
+class DynamicKafkaDeserializationSchema
+        implements KafkaRecordDeserializationSchema<RowData>, 
KafkaClusterAwareDeserializer {
 
     private static final long serialVersionUID = 1L;
 
@@ -58,6 +61,14 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
 
     private final boolean upsertMode;
 
+    /*
+     * Mask aligned with metadataConverters. When true at position i, the 
metadata field
+     * is filled with the dynamic Kafka cluster id instead of reading from 
ConsumerRecord.
+     * This enables per-cluster metadata injection in DynamicKafkaSource 
without changing
+     * record-level metadata extraction.
+     */
+    private final boolean needsKafkaClusterId;
+
     DynamicKafkaDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -68,6 +79,30 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
             boolean upsertMode) {
+        this(
+                physicalArity,
+                keyDeserialization,
+                keyProjection,
+                valueDeserialization,
+                valueProjection,
+                hasMetadata,
+                metadataConverters,
+                producedTypeInfo,
+                upsertMode,
+                null);
+    }
+
+    DynamicKafkaDeserializationSchema(
+            int physicalArity,
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            int[] keyProjection,
+            DeserializationSchema<RowData> valueDeserialization,
+            int[] valueProjection,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean upsertMode,
+            @Nullable boolean[] clusterMetadataPositions) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -83,9 +118,12 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
                         keyProjection,
                         valueProjection,
                         metadataConverters,
-                        upsertMode);
+                        upsertMode,
+                        clusterMetadataPositions);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
+        // Presence of a mask indicates that at least one metadata column 
expects cluster id.
+        this.needsKafkaClusterId = clusterMetadataPositions != null;
     }
 
     @Override
@@ -129,6 +167,16 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
         return producedTypeInfo;
     }
 
+    @Override
+    public boolean needsKafkaClusterId() {
+        return needsKafkaClusterId;
+    }
+
+    @Override
+    public void setKafkaClusterId(@Nullable String kafkaClusterId) {
+        this.outputCollector.kafkaClusterId = kafkaClusterId;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     interface MetadataConverter extends Serializable {
@@ -184,8 +232,16 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
 
         private final boolean upsertMode;
 
+        /*
+         * Same mask as the outer class: marks which metadata columns should 
be populated
+         * with the dynamic cluster id rather than per-record metadata 
converters.
+         */
+        private final @Nullable boolean[] clusterMetadataPositions;
+
         private transient ConsumerRecord<?, ?> inputRecord;
 
+        private transient @Nullable String kafkaClusterId;
+
         private transient List<RowData> physicalKeyRows;
 
         private transient Collector<RowData> outputCollector;
@@ -195,12 +251,14 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
                 int[] keyProjection,
                 int[] valueProjection,
                 MetadataConverter[] metadataConverters,
-                boolean upsertMode) {
+                boolean upsertMode,
+                @Nullable boolean[] clusterMetadataPositions) {
             this.physicalArity = physicalArity;
             this.keyProjection = keyProjection;
             this.valueProjection = valueProjection;
             this.metadataConverters = metadataConverters;
             this.upsertMode = upsertMode;
+            this.clusterMetadataPositions = clusterMetadataPositions;
         }
 
         @Override
@@ -254,6 +312,16 @@ class DynamicKafkaDeserializationSchema implements 
KafkaRecordDeserializationSch
             }
 
             for (int metadataPos = 0; metadataPos < metadataArity; 
metadataPos++) {
+                // When the mask is set, emit the per-cluster id instead of 
ConsumerRecord metadata.
+                if (clusterMetadataPositions != null && 
clusterMetadataPositions[metadataPos]) {
+                    final String clusterId =
+                            Preconditions.checkNotNull(
+                                    kafkaClusterId,
+                                    "Kafka cluster id is missing for dynamic 
source metadata");
+                    producedRow.setField(
+                            physicalArity + metadataPos, 
StringData.fromString(clusterId));
+                    continue;
+                }
                 producedRow.setField(
                         physicalArity + metadataPos,
                         metadataConverters[metadataPos].read(inputRecord));
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
index c3ea65ef..da303b7a 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
@@ -531,16 +531,23 @@ public class DynamicKafkaTableSource
             DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo) {
-        final MetadataConverter[] metadataConverters =
-                metadataKeys.stream()
-                        .map(
-                                k ->
-                                        Stream.of(ReadableMetadata.values())
-                                                .filter(rm -> rm.key.equals(k))
-                                                .findFirst()
-                                                
.orElseThrow(IllegalStateException::new))
-                        .map(m -> m.converter)
-                        .toArray(MetadataConverter[]::new);
+        final MetadataConverter[] metadataConverters = new 
MetadataConverter[metadataKeys.size()];
+        final boolean[] clusterMetadataPositions = new 
boolean[metadataKeys.size()];
+        boolean hasClusterMetadata = false;
+        for (int i = 0; i < metadataKeys.size(); i++) {
+            String key = metadataKeys.get(i);
+            ReadableMetadata metadata =
+                    Stream.of(ReadableMetadata.values())
+                            .filter(rm -> rm.key.equals(key))
+                            .findFirst()
+                            .orElseThrow(IllegalStateException::new);
+            metadataConverters[i] = metadata.converter;
+            if (metadata == ReadableMetadata.KAFKA_CLUSTER) {
+                clusterMetadataPositions[i] = true;
+                hasClusterMetadata = true;
+            }
+        }
+        final boolean[] clusterPositions = hasClusterMetadata ? 
clusterMetadataPositions : null;
 
         // check if connector metadata is used at all
         final boolean hasMetadata = metadataKeys.size() > 0;
@@ -567,7 +574,8 @@ public class DynamicKafkaTableSource
                 hasMetadata,
                 metadataConverters,
                 producedTypeInfo,
-                upsertMode);
+                upsertMode,
+                clusterPositions);
     }
 
     private @Nullable DeserializationSchema<RowData> createDeserialization(
@@ -591,6 +599,19 @@ public class DynamicKafkaTableSource
     // 
--------------------------------------------------------------------------------------------
 
     enum ReadableMetadata {
+        KAFKA_CLUSTER(
+                "kafka_cluster",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        throw new IllegalStateException(
+                                "Kafka cluster metadata should be populated by 
dynamic source.");
+                    }
+                }),
+
         TOPIC(
                 "topic",
                 DataTypes.STRING().notNull(),
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
new file mode 100644
index 00000000..243ceae9
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
+import 
org.apache.flink.connector.kafka.dynamic.source.reader.KafkaClusterAwareDeserializer;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for cluster metadata injection in {@link 
DynamicKafkaSource}. */
+public class DynamicKafkaSourceClusterMetadataITCase {
+
+    private static final String TOPIC = 
"DynamicKafkaSourceClusterMetadataITCase";
+    private static final int NUM_PARTITIONS = 3;
+    private static final int NUM_RECORDS_PER_SPLIT = 5;
+
+    private static final InMemoryReporter REPORTER = InMemoryReporter.create();
+
+    @RegisterExtension
+    public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(2)
+                            .setConfiguration(REPORTER.addToConfiguration(new 
Configuration()))
+                            .build());
+
+    @BeforeAll
+    static void beforeAll() throws Throwable {
+        DynamicKafkaSourceTestHelper.setup();
+        DynamicKafkaSourceTestHelper.createTopic(TOPIC, NUM_PARTITIONS, 1);
+        DynamicKafkaSourceTestHelper.produceToKafka(TOPIC, NUM_PARTITIONS, 
NUM_RECORDS_PER_SPLIT);
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        REPORTER.close();
+        DynamicKafkaSourceTestHelper.tearDown();
+    }
+
+    @Test
+    void testClusterMetadataInjection() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+
+        Properties properties = new Properties();
+        
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
 "0");
+        properties.setProperty(
+                
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+        MockKafkaMetadataService mockKafkaMetadataService =
+                new MockKafkaMetadataService(
+                        
Collections.singleton(DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC)));
+
+        DynamicKafkaSource<String> dynamicKafkaSource =
+                DynamicKafkaSource.<String>builder()
+                        .setStreamIds(
+                                
mockKafkaMetadataService.getAllStreams().stream()
+                                        .map(KafkaStream::getStreamId)
+                                        .collect(Collectors.toSet()))
+                        .setKafkaMetadataService(mockKafkaMetadataService)
+                        .setDeserializer(new ClusterIdDeserializationSchema())
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setProperties(properties)
+                        .build();
+
+        DataStreamSource<String> stream =
+                env.fromSource(
+                        dynamicKafkaSource, WatermarkStrategy.noWatermarks(), 
"dynamic-kafka-src");
+        CloseableIterator<String> iterator = stream.executeAndCollect();
+        List<String> results = new ArrayList<>();
+        int expectedCount =
+                DynamicKafkaSourceTestHelper.NUM_KAFKA_CLUSTERS
+                        * NUM_PARTITIONS
+                        * NUM_RECORDS_PER_SPLIT;
+        while (results.size() < expectedCount && iterator.hasNext()) {
+            results.add(iterator.next());
+        }
+        iterator.close();
+
+        assertThat(results).hasSize(expectedCount);
+
+        String cluster0 = DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+        String cluster1 = DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+        assertThat(results).contains(cluster0, cluster1);
+
+        Map<String, Long> counts =
+                results.stream()
+                        .collect(Collectors.groupingBy(value -> value, 
Collectors.counting()));
+        assertThat(counts)
+                .containsEntry(cluster0, (long) NUM_PARTITIONS * 
NUM_RECORDS_PER_SPLIT)
+                .containsEntry(cluster1, (long) NUM_PARTITIONS * 
NUM_RECORDS_PER_SPLIT);
+    }
+
+    private static final class ClusterIdDeserializationSchema
+            implements KafkaRecordDeserializationSchema<String>, 
KafkaClusterAwareDeserializer {
+        private static final long serialVersionUID = 1L;
+
+        private @Nullable String kafkaClusterId;
+
+        @Override
+        public boolean needsKafkaClusterId() {
+            return true;
+        }
+
+        @Override
+        public void setKafkaClusterId(@Nullable String kafkaClusterId) {
+            this.kafkaClusterId = kafkaClusterId;
+        }
+
+        @Override
+        public void deserialize(ConsumerRecord<byte[], byte[]> record, 
Collector<String> out) {
+            out.collect(
+                    Preconditions.checkNotNull(
+                            kafkaClusterId,
+                            "Kafka cluster id is missing for dynamic source 
metadata"));
+        }
+
+        @Override
+        public TypeInformation<String> getProducedType() {
+            return BasicTypeInfo.STRING_TYPE_INFO;
+        }
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
new file mode 100644
index 00000000..80313290
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link DynamicKafkaDeserializationSchema}. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+    @Test
+    void testClusterMetadataInjection() throws Exception {
+        AtomicBoolean metadataConverterUsed = new AtomicBoolean(false);
+        DynamicKafkaDeserializationSchema.MetadataConverter[] 
metadataConverters =
+                new DynamicKafkaDeserializationSchema.MetadataConverter[] {
+                    record -> {
+                        metadataConverterUsed.set(true);
+                        return null;
+                    }
+                };
+
+        DynamicKafkaDeserializationSchema schema =
+                new DynamicKafkaDeserializationSchema(
+                        1,
+                        null,
+                        new int[0],
+                        new SingleRowDeserializationSchema(),
+                        new int[] {0},
+                        true,
+                        metadataConverters,
+                        TypeInformation.of(RowData.class),
+                        false,
+                        new boolean[] {true});
+
+        schema.open(new TestInitializationContext());
+
+        String clusterId = "cluster-a";
+        schema.setKafkaClusterId(clusterId);
+
+        List<RowData> rows = new ArrayList<>();
+        schema.deserialize(
+                new ConsumerRecord<>("topic", 0, 0L, null, new byte[] {1}),
+                new Collector<RowData>() {
+                    @Override
+                    public void collect(RowData record) {
+                        rows.add(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+
+        assertThat(metadataConverterUsed.get()).isFalse();
+        assertThat(rows).hasSize(1);
+
+        GenericRowData row = (GenericRowData) rows.get(0);
+        
assertThat(row.getField(1)).isEqualTo(StringData.fromString(clusterId));
+    }
+
+    private static final class SingleRowDeserializationSchema
+            implements DeserializationSchema<RowData> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public RowData deserialize(byte[] message) {
+            return GenericRowData.of(StringData.fromString("value"));
+        }
+
+        @Override
+        public void deserialize(byte[] message, Collector<RowData> out) {
+            out.collect(deserialize(message));
+        }
+
+        @Override
+        public boolean isEndOfStream(RowData nextElement) {
+            return false;
+        }
+
+        @Override
+        public TypeInformation<RowData> getProducedType() {
+            return TypeInformation.of(RowData.class);
+        }
+    }
+
+    private static final class TestInitializationContext
+            implements DeserializationSchema.InitializationContext {
+        @Override
+        public MetricGroup getMetricGroup() {
+            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+        }
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            ClassLoader classLoader = 
DynamicKafkaDeserializationSchemaTest.class.getClassLoader();
+            return new UserCodeClassLoader() {
+                @Override
+                public ClassLoader asClassLoader() {
+                    return classLoader;
+                }
+
+                @Override
+                public void registerReleaseHookIfAbsent(
+                        String releaseHookName, Runnable releaseHook) {}
+            };
+        }
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
index 98eeaa29..c2591a0e 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
@@ -18,23 +18,35 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.utils.TableTestMatchers;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.assertj.core.api.ThrowingConsumer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.HamcrestCondition.matching;
 
 /** IT cases for the Dynamic Kafka table source. */
 class DynamicKafkaTableITCase extends KafkaTableTestBase {
@@ -44,6 +56,81 @@ class DynamicKafkaTableITCase extends KafkaTableTestBase {
         env.setParallelism(1);
     }
 
+    @Test
+    void testDynamicKafkaSourceWithClusterMetadata() throws Exception {
+        final String topic = "dynamic_table_cluster_metadata_" + 
UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        final String bootstrapServers = getBootstrapServers();
+        final String groupId = getStandardProps().getProperty("group.id");
+
+        final String createSinkTable =
+                String.format(
+                        "CREATE TABLE kafka_sink (\n"
+                                + "  name STRING,\n"
+                                + "  cnt INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'format' = 'json',\n"
+                                + "  'json.fail-on-missing-field' = 'false',\n"
+                                + "  'json.ignore-parse-errors' = 'true'\n"
+                                + ")",
+                        KafkaDynamicTableFactory.IDENTIFIER, topic, 
bootstrapServers);
+        tEnv.executeSql(createSinkTable);
+
+        final String insertData =
+                "INSERT INTO kafka_sink\n"
+                        + "SELECT * FROM (VALUES\n"
+                        + "  ('Alice', 1),\n"
+                        + "  ('Bob', 2)\n"
+                        + ") AS t(name, cnt)";
+        tEnv.executeSql(insertData).await();
+
+        final String createSourceTable =
+                String.format(
+                        "CREATE TABLE kafka_source (\n"
+                                + "  name STRING,\n"
+                                + "  cnt INT,\n"
+                                + "  kafka_cluster STRING METADATA FROM 
'kafka_cluster',\n"
+                                + "  topic STRING METADATA FROM 'topic'\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'stream-ids' = '%s',\n"
+                                + "  'metadata-service' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 
'earliest-offset',\n"
+                                + "  'format' = 'json',\n"
+                                + "  'json.fail-on-missing-field' = 'false',\n"
+                                + "  'json.ignore-parse-errors' = 'true'\n"
+                                + ")",
+                        DynamicKafkaTableFactory.IDENTIFIER,
+                        topic,
+                        MultiClusterMetadataService.class.getName(),
+                        bootstrapServers,
+                        groupId);
+        tEnv.executeSql(createSourceTable);
+
+        Table result = tEnv.sqlQuery("SELECT name, cnt, kafka_cluster, topic 
FROM kafka_source");
+        List<Row> actual = collectRows(result, 4);
+        actual.sort(
+                Comparator.comparing((Row row) -> row.getField(0).toString())
+                        .thenComparing(row -> row.getField(2).toString()));
+
+        List<Row> expected =
+                Arrays.asList(
+                        Row.of("Alice", 1, 
MultiClusterMetadataService.CLUSTER_0, topic),
+                        Row.of("Alice", 1, 
MultiClusterMetadataService.CLUSTER_1, topic),
+                        Row.of("Bob", 2, 
MultiClusterMetadataService.CLUSTER_0, topic),
+                        Row.of("Bob", 2, 
MultiClusterMetadataService.CLUSTER_1, topic));
+
+        
assertThat(actual).satisfies(matching(TableTestMatchers.deepEqualTo(expected, 
false)));
+
+        cleanupTopic(topic);
+    }
+
     @Test
     void testDynamicKafkaSource() throws Exception {
         final String topic = "dynamic_" + UUID.randomUUID();
@@ -117,4 +204,77 @@ class DynamicKafkaTableITCase extends KafkaTableTestBase {
             assertThat(ex).satisfiesAnyOf(ignoreIf);
         }
     }
+
+    /**
+     * Test metadata service that maps a single stream id to two logical 
clusters.
+     *
+     * <p>Each logical cluster points to a distinct topic, allowing the test 
to validate that the
+     * dynamic source injects the correct cluster id for each record.
+     */
+    public static final class MultiClusterMetadataService implements 
KafkaMetadataService {
+
+        static final String CLUSTER_0 = "cluster-0";
+        static final String CLUSTER_1 = "cluster-1";
+
+        private final Properties cluster0Props;
+        private final Properties cluster1Props;
+
+        public MultiClusterMetadataService(Properties properties) {
+            Properties baseProps = new Properties();
+            baseProps.putAll(properties);
+
+            cluster0Props = new Properties();
+            cluster0Props.putAll(baseProps);
+            cluster0Props.setProperty(
+                    ConsumerConfig.GROUP_ID_CONFIG,
+                    baseProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-tests")
+                            + "-"
+                            + CLUSTER_0);
+
+            cluster1Props = new Properties();
+            cluster1Props.putAll(baseProps);
+            cluster1Props.setProperty(
+                    ConsumerConfig.GROUP_ID_CONFIG,
+                    baseProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"flink-tests")
+                            + "-"
+                            + CLUSTER_1);
+        }
+
+        @Override
+        public Set<KafkaStream> getAllStreams() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Map<String, KafkaStream> describeStreams(Collection<String> 
streamIds) {
+            Map<String, KafkaStream> streams = new LinkedHashMap<>();
+            for (String streamId : streamIds) {
+                streams.put(streamId, createKafkaStream(streamId));
+            }
+            return streams;
+        }
+
+        @Override
+        public boolean isClusterActive(String kafkaClusterId) {
+            return CLUSTER_0.equals(kafkaClusterId) || 
CLUSTER_1.equals(kafkaClusterId);
+        }
+
+        @Override
+        public void close() {
+            // nothing to close
+        }
+
+        private KafkaStream createKafkaStream(String streamId) {
+            Map<String, ClusterMetadata> clusterMetadata = new 
LinkedHashMap<>();
+            clusterMetadata.put(
+                    CLUSTER_0,
+                    new ClusterMetadata(
+                            Collections.singleton(streamId), cluster0Props, 
null, null));
+            clusterMetadata.put(
+                    CLUSTER_1,
+                    new ClusterMetadata(
+                            Collections.singleton(streamId), cluster1Props, 
null, null));
+            return new KafkaStream(streamId, clusterMetadata);
+        }
+    }
 }


Reply via email to