mxm commented on code in PR #811:
URL: https://github.com/apache/flink-web/pull/811#discussion_r2513418826


##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components
+
+A standard Flink job for this task consists of three main components. 
+
+1.  **Kafka Source**: Connects to Kafka, subscribes to the topic, and 
distributes records for parallel processing.
+
+2.  **A `RowData` Converter**: This component is responsible for schema-aware 
processing. It takes the raw `byte[]` records from Kafka, deserializes them, 
and transforms them into Flink's internal `RowData` format. The logic here can 
vary:
+    *   **Specific Logic:** The converter could contain custom deserialization 
logic tailored to a single, known topic schema.
+    *   **Generic Logic:** More powerful, generic pipelines use a schema 
registry. For formats like Avro or Protobuf, the converter fetches the correct 
writer's schema from the registry (using an ID from the message header or 
payload). It then deserializes the bytes into a generic structure like Avro's 
`GenericRecord` or Protobuf's `DynamicMessage`.
+
+    This post will focus on the generic pipeline approach, which is designed 
for evolving data environments. Even in this generic deserialization approach, 
a standard job then maps these records to a **static `RowData` schema** that is 
hardcoded or configured in the Flink job. This static schema must correspond to 
the target Iceberg table's schema.
+
+3.  **Iceberg Sink**: Writes the stream of `RowData` to a specific Iceberg 
table, managing transactions and ensuring exactly-once semantics.
+
+This setup is simple, robust, and works perfectly for a single topic with a 
stable schema.
+
+
+## Scaling Up: The "One DAG Per Topic" Approach
+
+Now, what if we have thousands of topics? The logical next step is to create a 
dedicated processing graph (or DAG) for each topic-to-table mapping within a 
single Flink application.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/multiple-dag-pipeline.png" 
style="width:70%;margin:15px">
+</div>
+
+This looks good, but this static architecture cannot adapt to the changes: an 
Iceberg sink can only write to **one predefined table**, the table must **exist 
beforehand**, and its **schema is fixed** for the lifetime of the job.
+
+## When Static Pipelines Meet a Dynamic World

Review Comment:
   ```suggestion
   ### Scaling Up: Problems Ahead
   ```



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?

Review Comment:
   This intro sounds nice, but I would be more specific here, e.g. mention that 
the reason is that the target tables may change or that you need multiple 
Iceberg sinks. Otherwise I would think this is introducing autoscaling or 
autotuning feature.
   
   
   ```suggestion
   Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention as its 
write patterns change. But what if your ingestion pipeline could adapt on its 
own, with zero downtime?
   ```



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components
+
+A standard Flink job for this task consists of three main components. 
+
+1.  **Kafka Source**: Connects to Kafka, subscribes to the topic, and 
distributes records for parallel processing.
+
+2.  **A `RowData` Converter**: This component is responsible for schema-aware 
processing. It takes the raw `byte[]` records from Kafka, deserializes them, 
and transforms them into Flink's internal `RowData` format. The logic here can 
vary:
+    *   **Specific Logic:** The converter could contain custom deserialization 
logic tailored to a single, known topic schema.
+    *   **Generic Logic:** More powerful, generic pipelines use a schema 
registry. For formats like Avro or Protobuf, the converter fetches the correct 
writer's schema from the registry (using an ID from the message header or 
payload). It then deserializes the bytes into a generic structure like Avro's 
`GenericRecord` or Protobuf's `DynamicMessage`.
+
+    This post will focus on the generic pipeline approach, which is designed 
for evolving data environments. Even in this generic deserialization approach, 
a standard job then maps these records to a **static `RowData` schema** that is 
hardcoded or configured in the Flink job. This static schema must correspond to 
the target Iceberg table's schema.
+
+3.  **Iceberg Sink**: Writes the stream of `RowData` to a specific Iceberg 
table, managing transactions and ensuring exactly-once semantics.
+
+This setup is simple, robust, and works perfectly for a single topic with a 
stable schema.
+
+
+## Scaling Up: The "One DAG Per Topic" Approach
+
+Now, what if we have thousands of topics? The logical next step is to create a 
dedicated processing graph (or DAG) for each topic-to-table mapping within a 
single Flink application.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/multiple-dag-pipeline.png" 
style="width:70%;margin:15px">
+</div>
+
+This looks good, but this static architecture cannot adapt to the changes: an 
Iceberg sink can only write to **one predefined table**, the table must **exist 
beforehand**, and its **schema is fixed** for the lifetime of the job.
+
+## When Static Pipelines Meet a Dynamic World
+
+This static model becomes an operational bottleneck when faced with real-world 
scenarios.
+
+**Scenario 1: Schema Evolution**
+When a producer adds a new field to an event schema, the running Flink job, 
which was configured with the old schema, cannot process the new field. It 
continues to write the events with the older schema.
+This requires a manual update to the job's code or configuration, followed by 
a job restart.
+
+
+The target Iceberg table's schema must also be updated before the job is 
restarted. Once restarted, the Flink job's Iceberg sink will use the new table 
schema, allowing it to write events correctly from both the old and new schemas.
+
+**Scenario 2: A New Topic Appears**
+A new microservice starts producing events. The Flink job has no idea this 
topic exists. You must add a new DAG to the job code and **restart the 
application**.
+
+**Scenario 3: Dynamic Routing from a Single Topic**
+A single Kafka topic contains multiple event types that need to be routed to 
different Iceberg tables. A static sink, hard-wired to one table, can't do this.
+
+All these scenarios require complex workarounds and a way to **automatically 
restart the application** whenever something changes.
+
+### The Solution: The Dynamic Iceberg Sink
+
+Here’s the new architecture:
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/dynamic-iceberg-sink.png" 
style="width:70%;margin:15px">
+</div>
+
+This single, unified pipeline can ingest from any number of topics and write 
to any number of tables, automatically handling new topics and schema changes 
without restarts.
+
+
+
+#### A Look at the Implementation
+
+Let's dive into the key components that make this dynamic behavior possible.
+
+##### Step 1: Preserving Kafka Metadata with `KafkaRecord`
+
+To make dynamic decisions, our pipeline needs metadata. For example, to use 
the topic name as the table name, we need access to the topic! Standard 
deserializers often discard this, returning only the deserialized payload.
+
+To solve this, we first pass the raw Kafka `ConsumerRecord` through a 
lightweight wrapper. This wrapper converts it into a simple POJO, 
`KafkaRecord`, that preserves all the essential metadata for downstream 
processing.
+
+Here is the structure of our `KafkaRecord` class:
+
+```java
+public class KafkaRecord {
+    public final String topic;
+    public final byte[] key;
+    public final byte[] value;
+    public final Headers headers;
+    ....
+}
+```
+Now, every record flowing through our Flink pipeline is a `KafkaRecord` 
object, giving our converter access to the `topic` for table routing and the 
`value` (the raw byte payload) for schema discovery.
+
+##### Step 2: The `KafkaRecordToDynamicRecordGenerator`
+
+It takes a `KafkaRecord` and performs the "late binding" of schema and table 
information. For each message, it:
+
+1.  **Extracts the schema ID** from the `value` byte array (using the 
Confluent wire format).
+2.  **Fetches the writer's schema** from Schema Registry.
+3.  **Deserializes the Avro payload** and converts it into Flink's `RowData`.
+4.  **Bundles everything** into a `DynamicRecord`, which contains:
+    *   The `TableIdentifier` (created from `kafkaRecord.topic`).
+    *   The `org.apache.iceberg.Schema` (converted from the Avro schema).
+    *   The `PartitionSpec` (e.g., based on a timestamp field).
+    *   The `RowData` payload itself.
+
+```java
+public class KafkaRecordToDynamicRecordGenerator implements 
DynamicRecordGenerator<KafkaRecord> {
+    @Override
+    public DynamicRecord convert(KafkaRecord kafkaRecord) throws Exception {
+        // 1. Get schema ID from kafkaRecord.value and fetch schema
+        int schemaId = getSchemaId(kafkaRecord.value);
+        Schema writerSchema = schemaRegistryClient.getById(schemaId);
+
+        // 2. Deserialize and convert to RowData
+        GenericRecord genericRecord = deserialize(payload);
+        RowData rowData = avroToRowDataMapper.map(genericRecord);
+
+        // 3. Dynamically create table info from the KafkaRecord
+        TableIdentifier tableId = TableIdentifier.of(kafkaRecord.topic);
+        org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(writerSchema);
+        PartitionSpec spec = buildPartitionSpec(icebergSchema, kafkaRecord);
+
+        // 4. Return the complete DynamicRecord
+        return new DynamicRecord(
+                tableId,
+                "branch",
+                icebergSchema,
+                rowData,
+                spec,
+                DistributionMode.NONE,
+                1);
+    }
+}
+```
+
+##### Step 3: Assembling the Flink Job
+
+```java
+// A single stream from ALL topics, producing KafkaRecord objects
+DataStream<KafkaRecord> sourceStream = env.fromSource(kafkaSource, ...);
+
+// A single sink that handles everything
+DynamicIcebergSink.forInput(sourceStream)
+    .generator(new KafkaRecordToDynamicRecordGenerator())
+    .catalogLoader(getCatalogLoader())
+    .append();
+```
+
+
+## Project Details: Availability, Credits, and Development
+
+This powerful capability is now officially available as part of the Apache 
Iceberg project.
+
+Find more details on the Dynamic Iceberg Sink 
[here](https://iceberg.apache.org/docs/nightly/flink-writes/#flink-dynamic-iceberg-sink).
+
+### Supported Versions
+You can start using the dynamic sink with the following versions:
+*   **Apache Iceberg 1.10.0** or newer. Full release notes 
[here](https://github.com/apache/iceberg/releases/tag/apache-iceberg-1.10.0).
+*   **Apache Flink 1.20, 2.0, and 2.1**.
+
+### Contributors
+The journey began with a detailed project proposal authored by **Peter Vary**, 
which laid the groundwork for this development. You can read the original 
proposal 
[here](https://docs.google.com/document/d/1R3NZmi65S4lwnmNjH4gLCuXZbgvZV5GNrQKJ5NYdO9s/edit?tab=t.0#heading=h.xv1r23unqyn3).
+
+Major development efforts were led by **Maximilian Michels**, with 
contributions from several community members.
+
+## Conclusion
+
+In conclusion, the choice between a dynamic and a static Iceberg sink 
represents a trade-off between operational agility and the performance benefits 
of static bindings. While a simple, static Kafka-to-Iceberg sink is a 
performant and straightforward solution for stable data environments, the 
Dynamic Iceberg Sink pattern helps manage the complexity and velocity of 
frequently changing data.
+
+The most significant advantage of the dynamic sink is its ability to reduce 
operational burden by automating schema evolution. By leveraging a central 
schema registry, new schema versions can be published without any direct 
intervention in the Flink application. The dynamic sink detects these changes 
and adapts the downstream Iceberg table schema on the fly, eliminating the need 
for manual code changes, configuration updates, and disruptive job restarts. 
This creates a truly resilient and hands-off data ingestion pipeline.

Review Comment:
   ```suggestion
   The most significant advantage of the Dynamic Sink is its ability to reduce 
operational burden by automating schema evolution. By leveraging a central 
schema registry, new schema versions can be published without any direct 
intervention in the Flink application. The dynamic sink detects these changes 
and adapts the downstream Iceberg table schema on the fly, eliminating the need 
for manual code changes, configuration updates, and disruptive job restarts. 
This creates a truly resilient and hands-off data ingestion pipeline.
   ```
   
   I would keep it consistent and short by using "Dynamic Sink" when referring 
to the Flink Dynamic Iceberg Sink throughout this post.



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components

Review Comment:
   ```suggestion
   ### How to write to an Iceberg table with Flink
   ```



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components
+
+A standard Flink job for this task consists of three main components. 
+
+1.  **Kafka Source**: Connects to Kafka, subscribes to the topic, and 
distributes records for parallel processing.
+
+2.  **A `RowData` Converter**: This component is responsible for schema-aware 
processing. It takes the raw `byte[]` records from Kafka, deserializes them, 
and transforms them into Flink's internal `RowData` format. The logic here can 
vary:
+    *   **Specific Logic:** The converter could contain custom deserialization 
logic tailored to a single, known topic schema.
+    *   **Generic Logic:** More powerful, generic pipelines use a schema 
registry. For formats like Avro or Protobuf, the converter fetches the correct 
writer's schema from the registry (using an ID from the message header or 
payload). It then deserializes the bytes into a generic structure like Avro's 
`GenericRecord` or Protobuf's `DynamicMessage`.
+
+    This post will focus on the generic pipeline approach, which is designed 
for evolving data environments. Even in this generic deserialization approach, 
a standard job then maps these records to a **static `RowData` schema** that is 
hardcoded or configured in the Flink job. This static schema must correspond to 
the target Iceberg table's schema.
+
+3.  **Iceberg Sink**: Writes the stream of `RowData` to a specific Iceberg 
table, managing transactions and ensuring exactly-once semantics.
+
+This setup is simple, robust, and works perfectly for a single topic with a 
stable schema.
+
+
+## Scaling Up: The "One DAG Per Topic" Approach

Review Comment:
   ```suggestion
   ## Scaling Up: The Naive Approach
   ```



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components
+
+A standard Flink job for this task consists of three main components. 
+
+1.  **Kafka Source**: Connects to Kafka, subscribes to the topic, and 
distributes records for parallel processing.
+
+2.  **A `RowData` Converter**: This component is responsible for schema-aware 
processing. It takes the raw `byte[]` records from Kafka, deserializes them, 
and transforms them into Flink's internal `RowData` format. The logic here can 
vary:
+    *   **Specific Logic:** The converter could contain custom deserialization 
logic tailored to a single, known topic schema.
+    *   **Generic Logic:** More powerful, generic pipelines use a schema 
registry. For formats like Avro or Protobuf, the converter fetches the correct 
writer's schema from the registry (using an ID from the message header or 
payload). It then deserializes the bytes into a generic structure like Avro's 
`GenericRecord` or Protobuf's `DynamicMessage`.
+
+    This post will focus on the generic pipeline approach, which is designed 
for evolving data environments. Even in this generic deserialization approach, 
a standard job then maps these records to a **static `RowData` schema** that is 
hardcoded or configured in the Flink job. This static schema must correspond to 
the target Iceberg table's schema.
+
+3.  **Iceberg Sink**: Writes the stream of `RowData` to a specific Iceberg 
table, managing transactions and ensuring exactly-once semantics.
+
+This setup is simple, robust, and works perfectly for a single topic with a 
stable schema.
+
+
+## Scaling Up: The "One DAG Per Topic" Approach
+
+Now, what if we have thousands of topics? The logical next step is to create a 
dedicated processing graph (or DAG) for each topic-to-table mapping within a 
single Flink application.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/multiple-dag-pipeline.png" 
style="width:70%;margin:15px">
+</div>
+
+This looks good, but this static architecture cannot adapt to the changes: an 
Iceberg sink can only write to **one predefined table**, the table must **exist 
beforehand**, and its **schema is fixed** for the lifetime of the job.
+
+## When Static Pipelines Meet a Dynamic World
+
+This static model becomes an operational bottleneck when faced with real-world 
scenarios.
+
+**Scenario 1: Schema Evolution**
+When a producer adds a new field to an event schema, the running Flink job, 
which was configured with the old schema, cannot process the new field. It 
continues to write the events with the older schema.
+This requires a manual update to the job's code or configuration, followed by 
a job restart.
+
+
+The target Iceberg table's schema must also be updated before the job is 
restarted. Once restarted, the Flink job's Iceberg sink will use the new table 
schema, allowing it to write events correctly from both the old and new schemas.
+
+**Scenario 2: A New Topic Appears**
+A new microservice starts producing events. The Flink job has no idea this 
topic exists. You must add a new DAG to the job code and **restart the 
application**.
+
+**Scenario 3: Dynamic Routing from a Single Topic**
+A single Kafka topic contains multiple event types that need to be routed to 
different Iceberg tables. A static sink, hard-wired to one table, can't do this.
+
+All these scenarios require complex workarounds and a way to **automatically 
restart the application** whenever something changes.
+
+### The Solution: The Dynamic Iceberg Sink

Review Comment:
   ```suggestion
   ### The Solution: The Flink Dynamic Iceberg Sink ("Dynamic Sink")
   ```



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
+
+In this post, we'll guide you through building this exact system. We will 
start by exploring the limitations of traditional, static pipelines and then 
demonstrate how the dynamic sink pattern provides a robust, scalable solution. 
We'll focus on a common use case: ingesting Kafka data with dynamic Avro 
schemas sourced from a Confluent Schema Registry. By the end, you'll have a 
blueprint for building a scalable, self-adapting ingestion layer that 
eliminates operational toil and truly bridges your streams and your lakehouse.
+
+
+## The Building Block: A Simple Kafka-to-Iceberg Pipeline
+
+Let's start with the basics. Our goal is to get data from a single Kafka topic 
into a corresponding Iceberg table.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/simple-single-kafka-topic-to-iceberg.png"
 style="width:70%;margin:15px">
+</div>
+
+### Standard Flink Job Components
+
+A standard Flink job for this task consists of three main components. 
+
+1.  **Kafka Source**: Connects to Kafka, subscribes to the topic, and 
distributes records for parallel processing.
+
+2.  **A `RowData` Converter**: This component is responsible for schema-aware 
processing. It takes the raw `byte[]` records from Kafka, deserializes them, 
and transforms them into Flink's internal `RowData` format. The logic here can 
vary:
+    *   **Specific Logic:** The converter could contain custom deserialization 
logic tailored to a single, known topic schema.
+    *   **Generic Logic:** More powerful, generic pipelines use a schema 
registry. For formats like Avro or Protobuf, the converter fetches the correct 
writer's schema from the registry (using an ID from the message header or 
payload). It then deserializes the bytes into a generic structure like Avro's 
`GenericRecord` or Protobuf's `DynamicMessage`.
+
+    This post will focus on the generic pipeline approach, which is designed 
for evolving data environments. Even in this generic deserialization approach, 
a standard job then maps these records to a **static `RowData` schema** that is 
hardcoded or configured in the Flink job. This static schema must correspond to 
the target Iceberg table's schema.
+
+3.  **Iceberg Sink**: Writes the stream of `RowData` to a specific Iceberg 
table, managing transactions and ensuring exactly-once semantics.
+
+This setup is simple, robust, and works perfectly for a single topic with a 
stable schema.
+
+
+## Scaling Up: The "One DAG Per Topic" Approach
+
+Now, what if we have thousands of topics? The logical next step is to create a 
dedicated processing graph (or DAG) for each topic-to-table mapping within a 
single Flink application.
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/multiple-dag-pipeline.png" 
style="width:70%;margin:15px">
+</div>
+
+This looks good, but this static architecture cannot adapt to the changes: an 
Iceberg sink can only write to **one predefined table**, the table must **exist 
beforehand**, and its **schema is fixed** for the lifetime of the job.
+
+## When Static Pipelines Meet a Dynamic World
+
+This static model becomes an operational bottleneck when faced with real-world 
scenarios.
+
+**Scenario 1: Schema Evolution**
+When a producer adds a new field to an event schema, the running Flink job, 
which was configured with the old schema, cannot process the new field. It 
continues to write the events with the older schema.
+This requires a manual update to the job's code or configuration, followed by 
a job restart.
+
+
+The target Iceberg table's schema must also be updated before the job is 
restarted. Once restarted, the Flink job's Iceberg sink will use the new table 
schema, allowing it to write events correctly from both the old and new schemas.
+
+**Scenario 2: A New Topic Appears**
+A new microservice starts producing events. The Flink job has no idea this 
topic exists. You must add a new DAG to the job code and **restart the 
application**.
+
+**Scenario 3: Dynamic Routing from a Single Topic**
+A single Kafka topic contains multiple event types that need to be routed to 
different Iceberg tables. A static sink, hard-wired to one table, can't do this.
+
+All these scenarios require complex workarounds and a way to **automatically 
restart the application** whenever something changes.
+
+### The Solution: The Dynamic Iceberg Sink
+
+Here’s the new architecture:
+
+<div style="text-align: center;">
+<img 
src="/img/blog/2025-10-03-kafka-dynamic-iceberg-sink/dynamic-iceberg-sink.png" 
style="width:70%;margin:15px">
+</div>
+
+This single, unified pipeline can ingest from any number of topics and write 
to any number of tables, automatically handling new topics and schema changes 
without restarts.
+
+
+
+#### A Look at the Implementation
+
+Let's dive into the key components that make this dynamic behavior possible.
+
+##### Step 1: Preserving Kafka Metadata with `KafkaRecord`
+
+To make dynamic decisions, our pipeline needs metadata. For example, to use 
the topic name as the table name, we need access to the topic! Standard 
deserializers often discard this, returning only the deserialized payload.
+
+To solve this, we first pass the raw Kafka `ConsumerRecord` through a 
lightweight wrapper. This wrapper converts it into a simple POJO, 
`KafkaRecord`, that preserves all the essential metadata for downstream 
processing.
+
+Here is the structure of our `KafkaRecord` class:
+
+```java
+public class KafkaRecord {
+    public final String topic;
+    public final byte[] key;
+    public final byte[] value;
+    public final Headers headers;
+    ....
+}
+```
+Now, every record flowing through our Flink pipeline is a `KafkaRecord` 
object, giving our converter access to the `topic` for table routing and the 
`value` (the raw byte payload) for schema discovery.
+
+##### Step 2: The `KafkaRecordToDynamicRecordGenerator`
+
+It takes a `KafkaRecord` and performs the "late binding" of schema and table 
information. For each message, it:
+
+1.  **Extracts the schema ID** from the `value` byte array (using the 
Confluent wire format).
+2.  **Fetches the writer's schema** from Schema Registry.
+3.  **Deserializes the Avro payload** and converts it into Flink's `RowData`.
+4.  **Bundles everything** into a `DynamicRecord`, which contains:
+    *   The `TableIdentifier` (created from `kafkaRecord.topic`).
+    *   The `org.apache.iceberg.Schema` (converted from the Avro schema).
+    *   The `PartitionSpec` (e.g., based on a timestamp field).
+    *   The `RowData` payload itself.
+
+```java
+public class KafkaRecordToDynamicRecordGenerator implements 
DynamicRecordGenerator<KafkaRecord> {
+    @Override
+    public DynamicRecord convert(KafkaRecord kafkaRecord) throws Exception {
+        // 1. Get schema ID from kafkaRecord.value and fetch schema
+        int schemaId = getSchemaId(kafkaRecord.value);
+        Schema writerSchema = schemaRegistryClient.getById(schemaId);
+
+        // 2. Deserialize and convert to RowData
+        GenericRecord genericRecord = deserialize(payload);
+        RowData rowData = avroToRowDataMapper.map(genericRecord);
+
+        // 3. Dynamically create table info from the KafkaRecord
+        TableIdentifier tableId = TableIdentifier.of(kafkaRecord.topic);
+        org.apache.iceberg.Schema icebergSchema = 
AvroSchemaUtil.toIceberg(writerSchema);
+        PartitionSpec spec = buildPartitionSpec(icebergSchema, kafkaRecord);
+
+        // 4. Return the complete DynamicRecord
+        return new DynamicRecord(
+                tableId,
+                "branch",
+                icebergSchema,
+                rowData,
+                spec,
+                DistributionMode.NONE,
+                1);

Review Comment:
   NIT If you want to mention, this will use a write parallelism of 1, but you 
can use `Integer.MAX_VALUE` which will automatically use the maximum available 
write parallelism.



##########
docs/content/posts/2025-10-14-kafka-dynamic-iceberg-sink.md:
##########
@@ -0,0 +1,189 @@
+---
+title: "From Stream to Lakehouse: Kafka Ingestion with the Flink Dynamic 
Iceberg Sink"
+date: "2025-10-14T00:00:00.000Z"
+aliases:
+- /news/2025/10/14/2025-10-14-kafka-dynamic-iceberg-sink.html
+authors:
+- Swapna:
+  name: "Swapna Marru"
+
+---
+
+Ingesting thousands of evolving Kafka topics into a lakehouse often creates 
complex, brittle pipelines that require constant manual intervention. But what 
if your ingestion pipeline could adapt on its own, with zero downtime?
+
+Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache Flink 
that allows users to seamlessly write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**

Review Comment:
   
   ```suggestion
   Enter the **Flink Dynamic Iceberg Sink**, a powerful pattern for Apache 
Flink that allows users to write streaming data into multiple Iceberg 
tables—dynamically, efficiently, and with full schema evolution support. The 
sink can create and write to new tables based on instructions within the 
records themselves. As the schema of incoming records evolves, the dynamic sink 
automatically evolves the Iceberg table schema in the lakehouse. It can even 
adapt to changes in the table's partitioning scheme. The key is that all of 
this happens in real-time, **without a single job restart.**
   ```
   
   "seamlessly" sounds a bit generic.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to