aokolnychyi commented on a change in pull request #3461:
URL: https://github.com/apache/iceberg/pull/3461#discussion_r745156361



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -112,42 +136,74 @@ public BatchWrite buildForBatch() {
     // Get application id
     String appId = spark.sparkContext().applicationId();
 
-    SparkWrite write = new SparkWrite(spark, table, writeConf, writeInfo, 
appId, writeSchema, dsSchema);
-    if (overwriteByFilter) {
-      return write.asOverwriteByFilter(overwriteExpr);
-    } else if (overwriteDynamic) {
-      return write.asDynamicOverwrite();
-    } else if (overwriteFiles) {
-      return write.asCopyOnWriteMergeWrite(mergeScan, isolationLevel);
+    Distribution distribution;
+    SortOrder[] ordering;
+
+    if (requestDistributionAndOrdering) {
+      distribution = buildRequiredDistribution();
+      ordering = buildRequiredOrdering(distribution);
     } else {
-      return write.asBatchAppend();
+      LOG.warn("Can't request distribution/ordering as extensions are disabled 
and spec has non-identity transforms");
+      distribution = Distributions.unspecified();
+      ordering = NO_ORDERING;
     }
-  }
-
-  @Override
-  public StreamingWrite buildForStreaming() {
-    // Validate
-    Preconditions.checkArgument(handleTimestampWithoutZone || 
!SparkUtil.hasTimestampWithoutZone(table.schema()),
-        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
-
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, 
writeConf.checkNullability(), writeConf.checkOrdering());
-    SparkUtil.validatePartitionTransforms(table.spec());
 
-    // Change to streaming write if it is just append
-    Preconditions.checkState(!overwriteDynamic,
-        "Unsupported streaming operation: dynamic partition overwrite");
-    Preconditions.checkState(!overwriteByFilter || overwriteExpr == 
Expressions.alwaysTrue(),
-        "Unsupported streaming operation: overwrite by filter: %s", 
overwriteExpr);
+    return new SparkWrite(spark, table, writeConf, writeInfo, appId, 
writeSchema, dsSchema, distribution, ordering) {
+
+      @Override
+      public BatchWrite toBatch() {
+        if (rewrittenFileSetId != null) {
+          return asRewrite(rewrittenFileSetId);
+        } else if (overwriteByFilter) {
+          return asOverwriteByFilter(overwriteExpr);
+        } else if (overwriteDynamic) {
+          return asDynamicOverwrite();
+        } else if (overwriteFiles) {
+          return asCopyOnWriteMergeWrite(mergeScan, isolationLevel);
+        } else {
+          return asBatchAppend();
+        }
+      }
+
+      @Override
+      public StreamingWrite toStreaming() {
+        Preconditions.checkState(!overwriteDynamic,
+            "Unsupported streaming operation: dynamic partition overwrite");
+        Preconditions.checkState(!overwriteByFilter || overwriteExpr == 
Expressions.alwaysTrue(),
+            "Unsupported streaming operation: overwrite by filter: %s", 
overwriteExpr);
+        Preconditions.checkState(rewrittenFileSetId == null,
+            "Unsupported streaming operation: rewrite");
+
+        if (overwriteByFilter) {
+          return asStreamingOverwrite();
+        } else {
+          return asStreamingAppend();
+        }
+      }
+    };
+  }
 
-    // Get application id
-    String appId = spark.sparkContext().applicationId();
+  private Distribution buildRequiredDistribution() {
+    if (overwriteFiles) {
+      throw new UnsupportedOperationException("Copy-on-write operations are 
temporarily not supported");
+    } else {
+      DistributionMode distributionMode = writeConf.distributionMode();
+      return SparkDistributionAndOrderingUtil.buildRequiredDistribution(table, 
distributionMode);

Review comment:
       No longer applies after the rename.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to