sqd commented on code in PR #15433: URL: https://github.com/apache/iceberg/pull/15433#discussion_r3094006287
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java: ########## @@ -79,13 +81,18 @@ public class DynamicIcebergSink private final Configuration flinkConfig; private final int cacheMaximumSize; + // Set by the builder before sinkTo() — forward writer results to union into pre-commit topology + @Nullable + private final transient DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults; Review Comment: You are right. Removed the annotations and null handling from before the refactor. ########## docs/docs/flink-writes.md: ########## @@ -547,6 +546,28 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are | `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. | | `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). | +### Distribution Modes + +The `DistributionMode` set on each `DynamicRecord` controls how that record is routed from the processor to the writer: + +| Mode | Behavior | +|---------------|----------| +| `NONE` | Records are distributed across writer subtasks in a round-robin fashion (or by equality fields if set). | +| `HASH` | Records are distributed by partition key (partitioned tables) or equality fields (unpartitioned tables). Ensures that records for the same partition are handled by the same writer subtask. | +| `null` | Forward mode: bypasses distribution entirely and sends records directly via a forward edge (see below). | + +#### Forward Mode + +Using the `DynamicRecord` constructor overload without `distributionMode` parameter bypasses distribution entirely. This is designed for high-throughput pipelines where every partition already has a large volume of data and the serialization and network shuffle cost is prohibitive. Records are sent directly from the processor to the writer using a forward edge, enabling Flink operator chaining. Table metadata updates are always performed immediately inside the processor (regardless of `immediateTableUpdate` setting), because a dedicated table-update operator was deliberately omitted to avoid introducing extra data shuffles. + +Forward and regular records can be mixed in the same pipeline. The processor routes records to two separate sink outputs: + +- **Shuffle sink**: receives shuffling records. These go through the normal distribution topology (hash/round-robin) before reaching the writer. +- **Forward sink**: receives records without a `distributionMode`. These skip distribution entirely and flow via a forward edge from the processor, allowing Flink operator chaining. Suited for high-throughput tables where avoiding shuffle overhead is critical. + +!!! warning Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
