This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a73b48bf52 Doc: Flink: Add doc for the dynamic sink (#13608)
a73b48bf52 is described below

commit a73b48bf52e97bde22bb62c1030416edbb4775d7
Author: GuoYu <511955...@qq.com>
AuthorDate: Wed Jul 30 23:48:05 2025 +0800

    Doc: Flink: Add doc for the dynamic sink (#13608)
---
 docs/docs/flink-writes.md | 151 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 151 insertions(+)

diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md
index d3a077dc0c..6651bcb95b 100644
--- a/docs/docs/flink-writes.md
+++ b/docs/docs/flink-writes.md
@@ -396,3 +396,154 @@ To use SinkV2 based implementation, replace `FlinkSink` 
with `IcebergSink` in th
 
      - The `RANGE` distribution mode is not yet available for the `IcebergSink`
      - When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
+
+
+## Dynamic Iceberg Flink Sink
+
+Dynamic Flink Iceberg Sink allows:
+
+1. **Writing to any number of tables**  
+   A single sink can dynamically route records to multiple Iceberg tables.
+
+2. **Dynamic table creation and updates**  
+   Tables are created and updated based on user-defined routing logic.
+
+3. **Dynamic schema and partition evolution**  
+   Table schemas and partition specs update during streaming execution.
+
+All configurations are controlled through the `DynamicRecord` class, 
eliminating the need for Flink job restarts when requirements change.
+
+```java
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator((inputRecord, out) -> out.collect(
+                new DynamicRecord(
+                        TableIdentifier.of("db", "table"),
+                        "branch",
+                        SCHEMA,
+                        (RowData) inputRecord,
+                        PartitionSpec.unpartitioned(),
+                        DistributionMode.HASH,
+                        2)))
+        .catalogLoader(CatalogLoader.hive("hive", new Configuration(), 
Map.of()))
+        .writeParallelism(10)
+        .immediateTableUpdate(true)
+        .append();
+```
+
+### Configuration Example
+
+```java
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+
+// Set common properties
+builder
+    .set("write.parquet.compression-codec", "gzip");
+
+// Set Dynamic Sink specific options
+builder
+    .writeParallelism(4)
+    .uidPrefix("dynamic-sink")
+    .cacheMaxSize(500)
+    .cacheRefreshMs(5000);
+
+// Add generator and append sink
+builder.generator(new CustomRecordGenerator());
+builder.append();
+```
+
+### Dynamic Routing Configuration
+
+Dynamic table routing can be customized by implementing the 
`DynamicRecordGenerator` interface:
+
+```java
+public class CustomRecordGenerator implements DynamicRecordGenerator<RowData> {
+    @Override
+    public DynamicRecord generate(RowData row) {
+        DynamicRecord record = new DynamicRecord();
+        // Set table name based on business logic
+        TableIdentifier tableIdentifier = TableIdentifier.of(database, 
tableName);
+        record.setTableIdentifier(tableIdentifier);
+        record.setData(row);
+        // Set the maximum number of parallel writers for a given 
table/branch/schema/spec
+        record.writeParallelism(2);
+        return record;
+    }
+}
+
+// Set custom record generator when building the sink
+DynamicIcebergSink.Builder<RowData> builder = 
DynamicIcebergSink.forInput(inputStream);
+builder.generator(new CustomRecordGenerator());
+// ... other config ...
+builder.append();
+```
+The user should provide a converter which converts the input record to a 
DynamicRecord.
+We need the following information (DynamicRecord) for every record:
+
+| Property           | Description                                             
                                  |
+|--------------------|-------------------------------------------------------------------------------------------|
+| `TableIdentifier`  | The target table to which the record will be written.   
                                  |
+| `Branch`           | The target branch for writing the record (optional).    
                                  |
+| `Schema`           | The schema of the record.                               
                                  |
+| `Spec`             | The expected partitioning specification for the record. 
                                  |
+| `RowData`          | The actual row data to be written.                      
                                  |
+| `DistributionMode` | The distribution mode for writing the record (currently 
supports NONE or HASH).           |
+| `Parallelism`      | The maximum number of parallel writers for a given 
table/branch/schema/spec (WriteTarget). |
+| `UpsertMode`       | Overrides this table's write.upsert.enabled (optional). 
                                  |
+| `EqualityFields`   | The equality fields for the table(optional).            
                                            |
+
+### Schema Evolution
+
+The dynamic sink tries to match the schema provided in `DynamicRecord` with 
the existing table schemas.
+- If there is a direct match with one of the existing table schemas, that 
table schema will be used for writing to the table.
+- If there is no direct match, DynamicSink tries to adapt the provided schema 
such that it matches one of table schemas. For example, if there is an 
additional optional column in the table schema, a null value will be added to 
the RowData provided via DynamicRecord.
+- Otherwise, we evolve the table schema to match the input schema, within the 
constraints described below.
+
+The dynamic sink maintains an LRU cache for both table metadata and incoming 
schemas, with eviction based on size and time constraints. When a DynamicRecord 
contains a schema that is incompatible with the current table schema, a schema 
update is triggered. This update can occur either immediately or via a 
centralized executor, depending on the `immediateTableUpdate` configuration. 
While centralized updates reduce load on the Catalog, they may introduce 
backpressure on the sink.
+
+Supported schema updates:
+
+- Adding new columns
+- Widening existing column types (e.g., Integer → Long, Float → Double)
+- Making required columns optional
+
+Unsupported schema updates:
+
+- Dropping columns
+- Renaming columns
+
+Dropping columns is avoided to prevent issues with late or out-of-order data, 
as removed fields cannot be easily restored without data loss. Renaming is 
unsupported because schema comparison is name-based, and renames would require 
additional metadata or hints to resolve.
+
+
+#### Cache
+
+There are two distinct caches involved: the table metadata cache and the input 
schema cache.
+
+- The table metadata cache holds metadata such as schema definitions and 
partition specs to reduce repeated Catalog lookups. Its size is governed by the 
`cacheMaxSize` setting.
+- The input schema cache stores incoming schemas per table along with their 
compatibility resolution results. Its size is controlled by 
`inputSchemasPerTableCacheMaxSize`.
+
+To improve cache hit rates and performance, reuse the same 
DynamicRecord.schema instance if the record schema is unchanged.
+
+### Dynamic Sink Configuration
+
+The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here 
are the key configuration methods:
+
+| Method                                               | Description           
                                                                                
                                                                  |
+|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `overwrite(boolean enabled)`                         | Enable overwrite mode 
                                                                                
                                                                  |
+| `writeParallelism(int parallelism)`                  | Set writer 
parallelism                                                                     
                                                                             |
+| `uidPrefix(String prefix)`                           | Set operator UID 
prefix                                                                          
                                                                       |
+| `snapshotProperties(Map<String, String> properties)` | Set snapshot metadata 
properties                                                                      
                                                                  |
+| `toBranch(String branch)`                            | Write to a specific 
branch                                                                          
                                                                    |
+| `cacheMaxSize(int maxSize)`                          | Set cache size for 
table metadata                                                                  
                                                                     |
+| `cacheRefreshMs(long refreshMs)`                     | Set cache refresh 
interval                                                                        
                                                                      |
+| `inputSchemasPerTableCacheMaxSize(int size)`         | Set max input schemas 
to cache per table                                                              
                                                                  |
+| `immediateTableUpdate(boolean enabled)`              | Controls whether 
table metadata (schema/partition spec) updates immediately (default: false)     
                                                                                
                                                                              |
+| `set(String property, String value)`                 | Set any Iceberg write 
property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the 
options here: [write-options](flink-configuration.md#write-options) |
+| `setAll(Map<String, String> properties)`             | Set multiple 
properties at once                                                              
                                                                           |
+
+
+### Notes
+
+- **Range distribution mode**: Currently, the dynamic sink does not support 
the `RANGE` distribution mode, if set, it will fall back to `HASH`.
+- **Property Precedence Note**: When conflicts occur between table properties 
and sink properties, the sink properties will override the table properties 
configuration.

Reply via email to