This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6a9c182b47 Spark 3.5: Set useCommitCoordinator to false in batch
writes (#9017)
6a9c182b47 is described below
commit 6a9c182b47476025da1cfb741cc2f581e955330b
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Nov 10 11:36:01 2023 -0800
Spark 3.5: Set useCommitCoordinator to false in batch writes (#9017)
Co-authored-by: Huaxin Gao <[email protected]>
---
.../org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java | 5 +++++
.../org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java | 5 +++++
.../src/main/java/org/apache/iceberg/spark/source/SparkWrite.java | 5 +++++
3 files changed, 15 insertions(+)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index d0769eaa5f..a397a069ee 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -136,6 +136,11 @@ public class SparkPositionDeletesRewrite implements Write {
writeProperties);
}
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
@Override
public void commit(WriterCommitMessage[] messages) {
PositionDeletesRewriteCoordinator coordinator =
PositionDeletesRewriteCoordinator.get();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 6c0fc591ff..dcc949b290 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -171,6 +171,11 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return new PositionDeltaWriteFactory(tableBroadcast, command, context,
writeProperties);
}
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
@Override
public void commit(WriterCommitMessage[] messages) {
RowDelta rowDelta = table.newRowDelta();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 97359d0a3a..a7c1234a79 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -271,6 +271,11 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
return createWriterFactory();
}
+ @Override
+ public boolean useCommitCoordinator() {
+ return false;
+ }
+
@Override
public void abort(WriterCommitMessage[] messages) {
SparkWrite.this.abort(messages);