mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2852440369
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -411,28 +447,38 @@ public DataStreamSink<DynamicRecordInternal> append() {
.name(operatorName("generator"))
.returns(type);
- DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink =
- converted
- .getSideOutput(
- new OutputTag<>(
- DynamicRecordProcessor.DYNAMIC_TABLE_UPDATE_STREAM,
- new DynamicRecordInternalType(catalogLoader, true,
cacheMaximumSize)))
- .keyBy((KeySelector<DynamicRecordInternal, String>)
DynamicRecordInternal::tableName)
- .map(
- new DynamicTableUpdateOperator(
- catalogLoader,
- cacheMaximumSize,
- cacheRefreshMs,
- inputSchemasPerTableCacheMaximumSize,
- tableCreator,
- caseSensitive,
- dropUnusedColumns))
- .uid(prefixIfNotNull(uidPrefix, "-updater"))
- .name(operatorName("Updater"))
- .returns(type)
- .union(converted)
- .sinkTo(sink)
- .uid(prefixIfNotNull(uidPrefix, "-sink"));
+ DataStreamSink<DynamicRecordInternal> rowDataDataStreamSink;
+ if (passthroughRecords) {
+ if (!immediateUpdate) {
+ throw new UnsupportedOperationException(
+ "Immediate update must be enabled to pass through records");
+ }
+ rowDataDataStreamSink =
converted.sinkTo(sink).uid(prefixIfNotNull(uidPrefix, "-sink"));
+ } else {
Review Comment:
`DistributionMode.NONE` in the regular sink does strict forward partitioning
(no redistribution), which is similar to what the PR does. It leads to Flink
chaining the input with the writer. For DynamicSink, because we have many
tables, the idea was to spread out the data onto the available workers, which
is why we opted for a round-robin across the workers chosen for the table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]