rodmeneses commented on code in PR #12071:
URL: https://github.com/apache/iceberg/pull/12071#discussion_r2098985225


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java:
##########
@@ -692,20 +729,51 @@ private DataStream<RowData> 
distributeDataStream(DataStream<RowData> input) {
         }
 
       case RANGE:
-        if (equalityFieldIds.isEmpty()) {
+        // Ideally, exception should be thrown in the combination of range 
distribution and
+        // equality fields. Primary key case should use hash distribution mode.
+        // Keep the current behavior of falling back to keyBy for backward 
compatibility.
+        if (!equalityFieldIds.isEmpty()) {
           LOG.warn(
-              "Fallback to use 'none' distribution mode, because there are no 
equality fields set "
-                  + "and {}=range is not supported yet in flink",
-              WRITE_DISTRIBUTION_MODE);
-          return input;
-        } else {
-          LOG.info(
-              "Distribute rows by equality fields, because there are equality 
fields set "
-                  + "and{}=range is not supported yet in flink",
+              "Hash distribute rows by equality fields, even though {}=range 
is set. "
+                  + "Range distribution for primary keys are not always safe 
in "
+                  + "Flink streaming writer.",
               WRITE_DISTRIBUTION_MODE);
           return input.keyBy(new EqualityFieldKeySelector(schema, 
flinkRowType, equalityFieldIds));
         }
 
+        // range distribute by partition key or sort key if table has an 
SortOrder
+        Preconditions.checkState(
+            sortOrder.isSorted() || spec.isPartitioned(),
+            "Invalid write distribution mode: range. Need to define sort order 
or partition spec.");
+        if (sortOrder.isUnsorted()) {
+          sortOrder = Partitioning.sortOrderFor(spec);
+          LOG.info("Construct sort order from partition spec");
+        }
+
+        LOG.info("Range distribute rows by sort order: {}", sortOrder);
+        StatisticsType statisticsType = 
flinkWriteConf.rangeDistributionStatisticsType();
+        SingleOutputStreamOperator<StatisticsOrRecord> shuffleStream =
+            input
+                .transform(
+                    operatorName("range-shuffle"),
+                    TypeInformation.of(StatisticsOrRecord.class),
+                    new DataStatisticsOperatorFactory(
+                        schema,
+                        sortOrder,
+                        writerParallelism,
+                        statisticsType,
+                        flinkWriteConf.rangeDistributionSortKeyBaseWeight()))
+                // Set the parallelism same as input operator to encourage 
chaining
+                .setParallelism(input.getParallelism());

Review Comment:
   I added slotSharingGroup to enforce



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to