openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560650931



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns) 
{
           .name(String.format("IcebergSink %s", table.name()))
           .setParallelism(1);
     }
-  }
 
-  static IcebergStreamWriter<RowData> createStreamWriter(Table table, 
TableSchema requestedSchema,
-                                                         List<Integer> 
equalityFieldIds) {
-    Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
+    private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+                                                     Map<String, String> 
properties,
+                                                     PartitionSpec 
partitionSpec,
+                                                     Schema iSchema,
+                                                     RowType flinkRowType) {
+      DistributionMode writeMode;
+      if (distributionMode == null) {
+        // Fallback to use distribution mode parsed from table properties if 
don't specify in job level.
+        String modeName = PropertyUtil.propertyAsString(properties,
+            WRITE_DISTRIBUTION_MODE,
+            WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+        writeMode = DistributionMode.fromName(modeName);
+      } else {
+        writeMode = distributionMode;
+      }
+
+      switch (writeMode) {
+        case NONE:
+          return input;
+
+        case HASH:
+          if (partitionSpec.isUnpartitioned()) {
+            return input;
+          } else {
+            return input.keyBy(new PartitionKeySelector(partitionSpec, 
iSchema, flinkRowType));
+          }
 
-    RowType flinkSchema;
+        case RANGE:
+          throw new UnsupportedOperationException("The 
write.distribution-mode=range is not supported in flink now");

Review comment:
       There are two cases: 
   Case.1 :   people configure the distribution-mode in job-level to `RANGE`,  
as we don't support it now so we'd better to throw 
`UnsupportedOperationException`  now; 
   
   Case. 2:   people change an existing table's properties from `NONE` to 
`RANGE`,  then all running flink jobs wont' be affected unless restarting,  the 
newly started flink job will be required to use `NONE` or `HASH`.  It's not 
friendly to break all existing jobs when restarting,  let me add a warn log and 
just keep the default `NONE` behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to