This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 12d4d4b8 [FLINK-39037] support query kafka cluster id as metadata in
records of DynamicKafkaTableSource (#222)
12d4d4b8 is described below
commit 12d4d4b8bd8ca1af07922ec6d2b34254a44cb2af
Author: bowenli86 <[email protected]>
AuthorDate: Wed Feb 18 13:36:28 2026 -0800
[FLINK-39037] support query kafka cluster id as metadata in records of
DynamicKafkaTableSource (#222)
# Motivation
Kafka cluster id is an essential constant to query in context of multi
clusters of dynamic kafka source. ConsumerRecord includes
topic/partition/offset but not Kafka cluster id. This PR adds an opt-in
side-channel to inject cluster id into deserialization so SQL metadata can
expose kafka_cluster correctly in dynamic multi-cluster reads.
# What Changed
- Added a new internal deserializer hook, Updated dynamic source reader flow
- Extended table deserialization path
- Updated table source metadata wiring
- Added/updated tests:
- Added DynamicKafkaSourceClusterMetadataITCase.
- Added DynamicKafkaDeserializationSchemaTest.
- Updated DynamicKafkaTableITCase with cluster metadata coverage.
- Added docs
---
.../docs/connectors/table/dynamic-kafka.md | 30 +++-
.../content/docs/connectors/table/dynamic-kafka.md | 32 +++-
.../source/reader/DynamicKafkaSourceReader.java | 46 +++++-
.../reader/KafkaClusterAwareDeserializer.java | 34 ++++
.../table/DynamicKafkaDeserializationSchema.java | 74 ++++++++-
.../kafka/table/DynamicKafkaTableSource.java | 43 ++++--
.../DynamicKafkaSourceClusterMetadataITCase.java | 172 +++++++++++++++++++++
.../DynamicKafkaDeserializationSchemaTest.java | 139 +++++++++++++++++
.../kafka/table/DynamicKafkaTableITCase.java | 160 +++++++++++++++++++
9 files changed, 709 insertions(+), 21 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/dynamic-kafka.md
b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
index 39ba2871..c6f11778 100644
--- a/docs/content.zh/docs/connectors/table/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
@@ -50,6 +50,7 @@ CREATE TABLE DynamicKafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
+ `kafka_cluster` STRING METADATA FROM 'kafka_cluster',
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'dynamic-kafka',
@@ -70,8 +71,33 @@ CREATE TABLE DynamicKafkaTable (
可用元数据
--------
-Dynamic Kafka 连接器暴露的元数据列与 Kafka 连接器一致。
-请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka"
>}}#available-metadata)。
+Dynamic Kafka 连接器支持 Kafka 连接器的全部元数据列,并额外提供一个
+Dynamic Source 特有的元数据列:
+
+* `kafka_cluster`(`STRING NOT NULL`,只读):该记录对应的 Kafka 集群 ID,由元数据服务解析并注入。
+
+其余通用元数据列请参考
+[Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka" >}}#available-metadata)。
+
+示例:
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING,
+ `kafka_cluster` STRING METADATA FROM 'kafka_cluster' VIRTUAL
+) WITH (
+ 'connector' = 'dynamic-kafka',
+ 'stream-ids' = 'user_behavior;user_behavior_v2',
+ 'metadata-service' = 'single-cluster',
+ 'metadata-service.cluster-id' = 'cluster-0',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+);
+```
连接器参数
---------
diff --git a/docs/content/docs/connectors/table/dynamic-kafka.md
b/docs/content/docs/connectors/table/dynamic-kafka.md
index 52c7b8c6..cd9a5e2a 100644
--- a/docs/content/docs/connectors/table/dynamic-kafka.md
+++ b/docs/content/docs/connectors/table/dynamic-kafka.md
@@ -52,6 +52,7 @@ CREATE TABLE DynamicKafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
+ `kafka_cluster` STRING METADATA FROM 'kafka_cluster',
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'dynamic-kafka',
@@ -73,9 +74,34 @@ properties (all `properties.*` options) into the constructor
when available.
Available Metadata
------------------
-The Dynamic Kafka connector exposes the same metadata columns as the Kafka
connector.
-See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka"
>}}#available-metadata) for
-the full list.
+The Dynamic Kafka connector exposes all metadata columns from the Kafka
connector and adds one
+dynamic-source-specific metadata column:
+
+* `kafka_cluster` (`STRING NOT NULL`, read-only): cluster id resolved by the
metadata service for
+ the record.
+
+See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka"
>}}#available-metadata) for the
+shared metadata columns.
+
+Example:
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING,
+ `kafka_cluster` STRING METADATA FROM 'kafka_cluster' VIRTUAL
+) WITH (
+ 'connector' = 'dynamic-kafka',
+ 'stream-ids' = 'user_behavior;user_behavior_v2',
+ 'metadata-service' = 'single-cluster',
+ 'metadata-service.cluster-id' = 'cluster-0',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+);
+```
Connector Options
----------------
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 3e6f0534..660342df 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -47,6 +47,7 @@ import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
@@ -446,7 +447,9 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);
- deserializationSchema.open(
+ KafkaRecordDeserializationSchema<T> readerSchema =
+ createClusterDeserializationSchema(kafkaClusterId);
+ readerSchema.open(
new DeserializationSchema.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
@@ -463,7 +466,7 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
}
});
- KafkaRecordEmitter<T> recordEmitter = new
KafkaRecordEmitter<>(deserializationSchema);
+ KafkaRecordEmitter<T> recordEmitter = new
KafkaRecordEmitter<>(readerSchema);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
@@ -481,6 +484,45 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
kafkaSourceReaderMetrics);
}
+ private KafkaRecordDeserializationSchema<T>
createClusterDeserializationSchema(
+ String kafkaClusterId) {
+ /*
+ * Only deserializers that explicitly opt in to cluster metadata
should be
+ * wrapped with a per-cluster instance. This keeps the default path
identical
+ * to the static source, and avoids cloning when the schema is
cluster-agnostic.
+ */
+ if (!(deserializationSchema instanceof KafkaClusterAwareDeserializer))
{
+ return deserializationSchema;
+ }
+
+ KafkaClusterAwareDeserializer clusterAware =
+ (KafkaClusterAwareDeserializer) deserializationSchema;
+ /*
+ * The opt-in flag lets a deserializer declare whether it needs the
cluster id
+ * injected at all (e.g., table deserializers can decide based on
metadata mask).
+ */
+ if (!clusterAware.needsKafkaClusterId()) {
+ return deserializationSchema;
+ }
+
+ try {
+ /*
+ * Clone the schema to avoid sharing mutable state across clusters.
+ * The cloned instance is then bound to the cluster id before use.
+ */
+ KafkaRecordDeserializationSchema<T> readerSchema =
+ InstantiationUtil.clone(
+ deserializationSchema,
+
readerContext.getUserCodeClassLoader().asClassLoader());
+ ((KafkaClusterAwareDeserializer)
readerSchema).setKafkaClusterId(kafkaClusterId);
+ return readerSchema;
+ } catch (Exception error) {
+ throw new IllegalStateException(
+ "Failed to clone Kafka deserialization schema for dynamic
cluster metadata.",
+ error);
+ }
+ }
+
/**
* In metadata change, we need to reset the availability helper since the
number of Kafka source
* readers could have changed.
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
new file mode 100644
index 00000000..bbbd2e87
--- /dev/null
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/KafkaClusterAwareDeserializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.reader;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+/** Optional hook for deserializers that need access to the Kafka cluster id.
*/
+@Internal
+public interface KafkaClusterAwareDeserializer {
+
+ default boolean needsKafkaClusterId() {
+ return false;
+ }
+
+ void setKafkaClusterId(@Nullable String kafkaClusterId);
+}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
index 2fa67bd4..939d9efd 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.connector.kafka.dynamic.source.reader.KafkaClusterAwareDeserializer;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
@@ -40,7 +42,8 @@ import java.util.List;
/** A specific {@link KafkaRecordDeserializationSchema} for {@link
KafkaDynamicSource}. */
@Internal
-class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSchema<RowData> {
+class DynamicKafkaDeserializationSchema
+ implements KafkaRecordDeserializationSchema<RowData>,
KafkaClusterAwareDeserializer {
private static final long serialVersionUID = 1L;
@@ -58,6 +61,14 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
private final boolean upsertMode;
+ /*
+ * Mask aligned with metadataConverters. When true at position i, the
metadata field
+ * is filled with the dynamic Kafka cluster id instead of reading from
ConsumerRecord.
+ * This enables per-cluster metadata injection in DynamicKafkaSource
without changing
+ * record-level metadata extraction.
+ */
+ private final boolean needsKafkaClusterId;
+
DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -68,6 +79,30 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode) {
+ this(
+ physicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ valueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode,
+ null);
+ }
+
+ DynamicKafkaDeserializationSchema(
+ int physicalArity,
+ @Nullable DeserializationSchema<RowData> keyDeserialization,
+ int[] keyProjection,
+ DeserializationSchema<RowData> valueDeserialization,
+ int[] valueProjection,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean upsertMode,
+ @Nullable boolean[] clusterMetadataPositions) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -83,9 +118,12 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
keyProjection,
valueProjection,
metadataConverters,
- upsertMode);
+ upsertMode,
+ clusterMetadataPositions);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
+ // Presence of a mask indicates that at least one metadata column
expects cluster id.
+ this.needsKafkaClusterId = clusterMetadataPositions != null;
}
@Override
@@ -129,6 +167,16 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
return producedTypeInfo;
}
+ @Override
+ public boolean needsKafkaClusterId() {
+ return needsKafkaClusterId;
+ }
+
+ @Override
+ public void setKafkaClusterId(@Nullable String kafkaClusterId) {
+ this.outputCollector.kafkaClusterId = kafkaClusterId;
+ }
+
//
--------------------------------------------------------------------------------------------
interface MetadataConverter extends Serializable {
@@ -184,8 +232,16 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
private final boolean upsertMode;
+ /*
+ * Same mask as the outer class: marks which metadata columns should
be populated
+ * with the dynamic cluster id rather than per-record metadata
converters.
+ */
+ private final @Nullable boolean[] clusterMetadataPositions;
+
private transient ConsumerRecord<?, ?> inputRecord;
+ private transient @Nullable String kafkaClusterId;
+
private transient List<RowData> physicalKeyRows;
private transient Collector<RowData> outputCollector;
@@ -195,12 +251,14 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
int[] keyProjection,
int[] valueProjection,
MetadataConverter[] metadataConverters,
- boolean upsertMode) {
+ boolean upsertMode,
+ @Nullable boolean[] clusterMetadataPositions) {
this.physicalArity = physicalArity;
this.keyProjection = keyProjection;
this.valueProjection = valueProjection;
this.metadataConverters = metadataConverters;
this.upsertMode = upsertMode;
+ this.clusterMetadataPositions = clusterMetadataPositions;
}
@Override
@@ -254,6 +312,16 @@ class DynamicKafkaDeserializationSchema implements
KafkaRecordDeserializationSch
}
for (int metadataPos = 0; metadataPos < metadataArity;
metadataPos++) {
+ // When the mask is set, emit the per-cluster id instead of
ConsumerRecord metadata.
+ if (clusterMetadataPositions != null &&
clusterMetadataPositions[metadataPos]) {
+ final String clusterId =
+ Preconditions.checkNotNull(
+ kafkaClusterId,
+ "Kafka cluster id is missing for dynamic
source metadata");
+ producedRow.setField(
+ physicalArity + metadataPos,
StringData.fromString(clusterId));
+ continue;
+ }
producedRow.setField(
physicalArity + metadataPos,
metadataConverters[metadataPos].read(inputRecord));
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
index c3ea65ef..da303b7a 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
@@ -531,16 +531,23 @@ public class DynamicKafkaTableSource
DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo) {
- final MetadataConverter[] metadataConverters =
- metadataKeys.stream()
- .map(
- k ->
- Stream.of(ReadableMetadata.values())
- .filter(rm -> rm.key.equals(k))
- .findFirst()
-
.orElseThrow(IllegalStateException::new))
- .map(m -> m.converter)
- .toArray(MetadataConverter[]::new);
+ final MetadataConverter[] metadataConverters = new
MetadataConverter[metadataKeys.size()];
+ final boolean[] clusterMetadataPositions = new
boolean[metadataKeys.size()];
+ boolean hasClusterMetadata = false;
+ for (int i = 0; i < metadataKeys.size(); i++) {
+ String key = metadataKeys.get(i);
+ ReadableMetadata metadata =
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new);
+ metadataConverters[i] = metadata.converter;
+ if (metadata == ReadableMetadata.KAFKA_CLUSTER) {
+ clusterMetadataPositions[i] = true;
+ hasClusterMetadata = true;
+ }
+ }
+ final boolean[] clusterPositions = hasClusterMetadata ?
clusterMetadataPositions : null;
// check if connector metadata is used at all
final boolean hasMetadata = metadataKeys.size() > 0;
@@ -567,7 +574,8 @@ public class DynamicKafkaTableSource
hasMetadata,
metadataConverters,
producedTypeInfo,
- upsertMode);
+ upsertMode,
+ clusterPositions);
}
private @Nullable DeserializationSchema<RowData> createDeserialization(
@@ -591,6 +599,19 @@ public class DynamicKafkaTableSource
//
--------------------------------------------------------------------------------------------
enum ReadableMetadata {
+ KAFKA_CLUSTER(
+ "kafka_cluster",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ throw new IllegalStateException(
+ "Kafka cluster metadata should be populated by
dynamic source.");
+ }
+ }),
+
TOPIC(
"topic",
DataTypes.STRING().notNull(),
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
new file mode 100644
index 00000000..243ceae9
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceClusterMetadataITCase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
+import
org.apache.flink.connector.kafka.dynamic.source.reader.KafkaClusterAwareDeserializer;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for cluster metadata injection in {@link
DynamicKafkaSource}. */
+public class DynamicKafkaSourceClusterMetadataITCase {
+
+ private static final String TOPIC =
"DynamicKafkaSourceClusterMetadataITCase";
+ private static final int NUM_PARTITIONS = 3;
+ private static final int NUM_RECORDS_PER_SPLIT = 5;
+
+ private static final InMemoryReporter REPORTER = InMemoryReporter.create();
+
+ @RegisterExtension
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(2)
+ .setConfiguration(REPORTER.addToConfiguration(new
Configuration()))
+ .build());
+
+ @BeforeAll
+ static void beforeAll() throws Throwable {
+ DynamicKafkaSourceTestHelper.setup();
+ DynamicKafkaSourceTestHelper.createTopic(TOPIC, NUM_PARTITIONS, 1);
+ DynamicKafkaSourceTestHelper.produceToKafka(TOPIC, NUM_PARTITIONS,
NUM_RECORDS_PER_SPLIT);
+ }
+
+ @AfterAll
+ static void afterAll() throws Exception {
+ REPORTER.close();
+ DynamicKafkaSourceTestHelper.tearDown();
+ }
+
+ @Test
+ void testClusterMetadataInjection() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+ MockKafkaMetadataService mockKafkaMetadataService =
+ new MockKafkaMetadataService(
+
Collections.singleton(DynamicKafkaSourceTestHelper.getKafkaStream(TOPIC)));
+
+ DynamicKafkaSource<String> dynamicKafkaSource =
+ DynamicKafkaSource.<String>builder()
+ .setStreamIds(
+
mockKafkaMetadataService.getAllStreams().stream()
+ .map(KafkaStream::getStreamId)
+ .collect(Collectors.toSet()))
+ .setKafkaMetadataService(mockKafkaMetadataService)
+ .setDeserializer(new ClusterIdDeserializationSchema())
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setProperties(properties)
+ .build();
+
+ DataStreamSource<String> stream =
+ env.fromSource(
+ dynamicKafkaSource, WatermarkStrategy.noWatermarks(),
"dynamic-kafka-src");
+ CloseableIterator<String> iterator = stream.executeAndCollect();
+ List<String> results = new ArrayList<>();
+ int expectedCount =
+ DynamicKafkaSourceTestHelper.NUM_KAFKA_CLUSTERS
+ * NUM_PARTITIONS
+ * NUM_RECORDS_PER_SPLIT;
+ while (results.size() < expectedCount && iterator.hasNext()) {
+ results.add(iterator.next());
+ }
+ iterator.close();
+
+ assertThat(results).hasSize(expectedCount);
+
+ String cluster0 = DynamicKafkaSourceTestHelper.getKafkaClusterId(0);
+ String cluster1 = DynamicKafkaSourceTestHelper.getKafkaClusterId(1);
+ assertThat(results).contains(cluster0, cluster1);
+
+ Map<String, Long> counts =
+ results.stream()
+ .collect(Collectors.groupingBy(value -> value,
Collectors.counting()));
+ assertThat(counts)
+ .containsEntry(cluster0, (long) NUM_PARTITIONS *
NUM_RECORDS_PER_SPLIT)
+ .containsEntry(cluster1, (long) NUM_PARTITIONS *
NUM_RECORDS_PER_SPLIT);
+ }
+
+ private static final class ClusterIdDeserializationSchema
+ implements KafkaRecordDeserializationSchema<String>,
KafkaClusterAwareDeserializer {
+ private static final long serialVersionUID = 1L;
+
+ private @Nullable String kafkaClusterId;
+
+ @Override
+ public boolean needsKafkaClusterId() {
+ return true;
+ }
+
+ @Override
+ public void setKafkaClusterId(@Nullable String kafkaClusterId) {
+ this.kafkaClusterId = kafkaClusterId;
+ }
+
+ @Override
+ public void deserialize(ConsumerRecord<byte[], byte[]> record,
Collector<String> out) {
+ out.collect(
+ Preconditions.checkNotNull(
+ kafkaClusterId,
+ "Kafka cluster id is missing for dynamic source
metadata"));
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ }
+}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
new file mode 100644
index 00000000..80313290
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchemaTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link DynamicKafkaDeserializationSchema}. */
+public class DynamicKafkaDeserializationSchemaTest {
+
+ @Test
+ void testClusterMetadataInjection() throws Exception {
+ AtomicBoolean metadataConverterUsed = new AtomicBoolean(false);
+ DynamicKafkaDeserializationSchema.MetadataConverter[]
metadataConverters =
+ new DynamicKafkaDeserializationSchema.MetadataConverter[] {
+ record -> {
+ metadataConverterUsed.set(true);
+ return null;
+ }
+ };
+
+ DynamicKafkaDeserializationSchema schema =
+ new DynamicKafkaDeserializationSchema(
+ 1,
+ null,
+ new int[0],
+ new SingleRowDeserializationSchema(),
+ new int[] {0},
+ true,
+ metadataConverters,
+ TypeInformation.of(RowData.class),
+ false,
+ new boolean[] {true});
+
+ schema.open(new TestInitializationContext());
+
+ String clusterId = "cluster-a";
+ schema.setKafkaClusterId(clusterId);
+
+ List<RowData> rows = new ArrayList<>();
+ schema.deserialize(
+ new ConsumerRecord<>("topic", 0, 0L, null, new byte[] {1}),
+ new Collector<RowData>() {
+ @Override
+ public void collect(RowData record) {
+ rows.add(record);
+ }
+
+ @Override
+ public void close() {}
+ });
+
+ assertThat(metadataConverterUsed.get()).isFalse();
+ assertThat(rows).hasSize(1);
+
+ GenericRowData row = (GenericRowData) rows.get(0);
+
assertThat(row.getField(1)).isEqualTo(StringData.fromString(clusterId));
+ }
+
+ private static final class SingleRowDeserializationSchema
+ implements DeserializationSchema<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public RowData deserialize(byte[] message) {
+ return GenericRowData.of(StringData.fromString("value"));
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector<RowData> out) {
+ out.collect(deserialize(message));
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return TypeInformation.of(RowData.class);
+ }
+ }
+
+ private static final class TestInitializationContext
+ implements DeserializationSchema.InitializationContext {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ ClassLoader classLoader =
DynamicKafkaDeserializationSchemaTest.class.getClassLoader();
+ return new UserCodeClassLoader() {
+ @Override
+ public ClassLoader asClassLoader() {
+ return classLoader;
+ }
+
+ @Override
+ public void registerReleaseHookIfAbsent(
+ String releaseHookName, Runnable releaseHook) {}
+ };
+ }
+ }
+}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
index 98eeaa29..c2591a0e 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableITCase.java
@@ -18,23 +18,35 @@
package org.apache.flink.streaming.connectors.kafka.table;
+import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.utils.TableTestMatchers;
import org.apache.flink.types.Row;
import org.apache.flink.util.function.RunnableWithException;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.UUID;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.HamcrestCondition.matching;
/** IT cases for the Dynamic Kafka table source. */
class DynamicKafkaTableITCase extends KafkaTableTestBase {
@@ -44,6 +56,81 @@ class DynamicKafkaTableITCase extends KafkaTableTestBase {
env.setParallelism(1);
}
+ @Test
+ void testDynamicKafkaSourceWithClusterMetadata() throws Exception {
+ final String topic = "dynamic_table_cluster_metadata_" +
UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+
+ final String bootstrapServers = getBootstrapServers();
+ final String groupId = getStandardProps().getProperty("group.id");
+
+ final String createSinkTable =
+ String.format(
+ "CREATE TABLE kafka_sink (\n"
+ + " name STRING,\n"
+ + " cnt INT\n"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'format' = 'json',\n"
+ + " 'json.fail-on-missing-field' = 'false',\n"
+ + " 'json.ignore-parse-errors' = 'true'\n"
+ + ")",
+ KafkaDynamicTableFactory.IDENTIFIER, topic,
bootstrapServers);
+ tEnv.executeSql(createSinkTable);
+
+ final String insertData =
+ "INSERT INTO kafka_sink\n"
+ + "SELECT * FROM (VALUES\n"
+ + " ('Alice', 1),\n"
+ + " ('Bob', 2)\n"
+ + ") AS t(name, cnt)";
+ tEnv.executeSql(insertData).await();
+
+ final String createSourceTable =
+ String.format(
+ "CREATE TABLE kafka_source (\n"
+ + " name STRING,\n"
+ + " cnt INT,\n"
+ + " kafka_cluster STRING METADATA FROM
'kafka_cluster',\n"
+ + " topic STRING METADATA FROM 'topic'\n"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " 'stream-ids' = '%s',\n"
+ + " 'metadata-service' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' =
'earliest-offset',\n"
+ + " 'format' = 'json',\n"
+ + " 'json.fail-on-missing-field' = 'false',\n"
+ + " 'json.ignore-parse-errors' = 'true'\n"
+ + ")",
+ DynamicKafkaTableFactory.IDENTIFIER,
+ topic,
+ MultiClusterMetadataService.class.getName(),
+ bootstrapServers,
+ groupId);
+ tEnv.executeSql(createSourceTable);
+
+ Table result = tEnv.sqlQuery("SELECT name, cnt, kafka_cluster, topic
FROM kafka_source");
+ List<Row> actual = collectRows(result, 4);
+ actual.sort(
+ Comparator.comparing((Row row) -> row.getField(0).toString())
+ .thenComparing(row -> row.getField(2).toString()));
+
+ List<Row> expected =
+ Arrays.asList(
+ Row.of("Alice", 1,
MultiClusterMetadataService.CLUSTER_0, topic),
+ Row.of("Alice", 1,
MultiClusterMetadataService.CLUSTER_1, topic),
+ Row.of("Bob", 2,
MultiClusterMetadataService.CLUSTER_0, topic),
+ Row.of("Bob", 2,
MultiClusterMetadataService.CLUSTER_1, topic));
+
+
assertThat(actual).satisfies(matching(TableTestMatchers.deepEqualTo(expected,
false)));
+
+ cleanupTopic(topic);
+ }
+
@Test
void testDynamicKafkaSource() throws Exception {
final String topic = "dynamic_" + UUID.randomUUID();
@@ -117,4 +204,77 @@ class DynamicKafkaTableITCase extends KafkaTableTestBase {
assertThat(ex).satisfiesAnyOf(ignoreIf);
}
}
+
+ /**
+ * Test metadata service that maps a single stream id to two logical
clusters.
+ *
+ * <p>Each logical cluster points to a distinct topic, allowing the test
to validate that the
+ * dynamic source injects the correct cluster id for each record.
+ */
+ public static final class MultiClusterMetadataService implements
KafkaMetadataService {
+
+ static final String CLUSTER_0 = "cluster-0";
+ static final String CLUSTER_1 = "cluster-1";
+
+ private final Properties cluster0Props;
+ private final Properties cluster1Props;
+
+ public MultiClusterMetadataService(Properties properties) {
+ Properties baseProps = new Properties();
+ baseProps.putAll(properties);
+
+ cluster0Props = new Properties();
+ cluster0Props.putAll(baseProps);
+ cluster0Props.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ baseProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG,
"flink-tests")
+ + "-"
+ + CLUSTER_0);
+
+ cluster1Props = new Properties();
+ cluster1Props.putAll(baseProps);
+ cluster1Props.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ baseProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG,
"flink-tests")
+ + "-"
+ + CLUSTER_1);
+ }
+
+ @Override
+ public Set<KafkaStream> getAllStreams() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Map<String, KafkaStream> describeStreams(Collection<String>
streamIds) {
+ Map<String, KafkaStream> streams = new LinkedHashMap<>();
+ for (String streamId : streamIds) {
+ streams.put(streamId, createKafkaStream(streamId));
+ }
+ return streams;
+ }
+
+ @Override
+ public boolean isClusterActive(String kafkaClusterId) {
+ return CLUSTER_0.equals(kafkaClusterId) ||
CLUSTER_1.equals(kafkaClusterId);
+ }
+
+ @Override
+ public void close() {
+ // nothing to close
+ }
+
+ private KafkaStream createKafkaStream(String streamId) {
+ Map<String, ClusterMetadata> clusterMetadata = new
LinkedHashMap<>();
+ clusterMetadata.put(
+ CLUSTER_0,
+ new ClusterMetadata(
+ Collections.singleton(streamId), cluster0Props,
null, null));
+ clusterMetadata.put(
+ CLUSTER_1,
+ new ClusterMetadata(
+ Collections.singleton(streamId), cluster1Props,
null, null));
+ return new KafkaStream(streamId, clusterMetadata);
+ }
+ }
}