pvary commented on code in PR #10859:
URL: https://github.com/apache/iceberg/pull/10859#discussion_r1705286597
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -548,21 +599,46 @@ private DataStream<RowData> distributeDataStream(
}
case RANGE:
- if (equalityFieldIds.isEmpty()) {
+ // Ideally, exception should be thrown in the combination of range
distribution and
+ // equality fields. Primary key case should use hash distribution
mode.
+ // Keep the current behavior of falling back to keyBy for backward
compatibility.
+ if (!equalityFieldIds.isEmpty()) {
LOG.warn(
- "Fallback to use 'none' distribution mode, because there are
no equality fields set "
- + "and {}=range is not supported yet in flink",
- WRITE_DISTRIBUTION_MODE);
- return input;
- } else {
- LOG.info(
- "Distribute rows by equality fields, because there are
equality fields set "
- + "and{}=range is not supported yet in flink",
+ "Hash distribute rows by equality fields, even though {}=range
is set. "
+ + "Range distribution for primary keys are not always safe
in "
+ + "Flink streaming writer.",
WRITE_DISTRIBUTION_MODE);
return input.keyBy(
new EqualityFieldKeySelector(iSchema, flinkRowType,
equalityFieldIds));
}
+ // range distribute by partition key or sort key if table has an
SortOrder
+ Preconditions.checkState(
+ sortOrder.isSorted() || partitionSpec.isPartitioned(),
+ "Invalid write distribution mode: range. Need to define sort
order and partition spec.");
+ if (sortOrder.isUnsorted()) {
+ sortOrder = Partitioning.sortOrderFor(partitionSpec);
+ LOG.info("Construct sort order from partition spec");
+ }
+
+ LOG.info("Range distribute rows by sort order: {}", sortOrder);
+ StatisticsType statisticsType =
flinkWriteConf.rangeDistributionStatisticsType();
+ return input
+ .transform(
+ operatorName("range-shuffle"),
Review Comment:
Do we need to add uid to the operator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]