This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e57b9f6954 Spark: Backport #10373 to Spark 3.3 and 3.4 (#10546)
e57b9f6954 is described below

commit e57b9f69543886236297db43923a08994f9e18c4
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Jun 21 07:33:26 2024 -0700

    Spark: Backport #10373 to Spark 3.3 and 3.4 (#10546)
---
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java     |  8 ++++++--
 .../apache/iceberg/spark/source/SparkPositionDeltaWrite.java   | 10 +++++-----
 .../main/java/org/apache/iceberg/spark/source/SparkWrite.java  | 10 +++++-----
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java     |  8 ++++++--
 .../apache/iceberg/spark/source/SparkPositionDeltaWrite.java   | 10 +++++-----
 .../main/java/org/apache/iceberg/spark/source/SparkWrite.java  | 10 +++++-----
 6 files changed, 32 insertions(+), 24 deletions(-)

diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 288b2d417f..78eb010b9b 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ImmutableRewriteManifests;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
@@ -345,8 +346,11 @@ public class RewriteManifestsSparkAction
       // don't clean up added manifest files, because they may have been 
successfully committed.
       throw commitStateUnknownException;
     } catch (Exception e) {
-      // delete all new manifests because the rewrite failed
-      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      if (e instanceof CleanableFailure) {
+        // delete all new manifests because the rewrite failed
+        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      }
+
       throw e;
     }
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 2d38292439..8e5aeef6e1 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -43,7 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -102,7 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkPositionDeltaWrite(
       SparkSession spark,
@@ -284,9 +284,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
         operation.commit(); // abort is automatically called if this fails
         long duration = System.currentTimeMillis() - start;
         LOG.info("Committed in {} ms", duration);
-      } catch (CommitStateUnknownException commitStateUnknownException) {
-        cleanupOnAbort = false;
-        throw commitStateUnknownException;
+      } catch (Exception e) {
+        cleanupOnAbort = e instanceof CleanableFailure;
+        throw e;
       }
     }
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index c26b7f5f38..18db2ee2c7 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -101,7 +101,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private final Distribution requiredDistribution;
   private final SortOrder[] requiredOrdering;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkWrite(
       SparkSession spark,
@@ -216,9 +216,9 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       operation.commit(); // abort is automatically called if this fails
       long duration = System.currentTimeMillis() - start;
       LOG.info("Committed in {} ms", duration);
-    } catch (CommitStateUnknownException commitStateUnknownException) {
-      cleanupOnAbort = false;
-      throw commitStateUnknownException;
+    } catch (Exception e) {
+      cleanupOnAbort = e instanceof CleanableFailure;
+      throw e;
     }
   }
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 8ec3b44f92..e9edfeb985 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ImmutableRewriteManifests;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
@@ -355,8 +356,11 @@ public class RewriteManifestsSparkAction
       // don't clean up added manifest files, because they may have been 
successfully committed.
       throw commitStateUnknownException;
     } catch (Exception e) {
-      // delete all new manifests because the rewrite failed
-      deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      if (e instanceof CleanableFailure) {
+        // delete all new manifests because the rewrite failed
+        deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+      }
+
       throw e;
     }
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index c5fc8e0b0f..808764b31f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -44,7 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
   private final Context context;
   private final Map<String, String> writeProperties;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkPositionDeltaWrite(
       SparkSession spark,
@@ -293,9 +293,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
         operation.commit(); // abort is automatically called if this fails
         long duration = System.currentTimeMillis() - start;
         LOG.info("Committed in {} ms", duration);
-      } catch (CommitStateUnknownException commitStateUnknownException) {
-        cleanupOnAbort = false;
-        throw commitStateUnknownException;
+      } catch (Exception e) {
+        cleanupOnAbort = e instanceof CleanableFailure;
+        throw e;
       }
     }
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d879a1f961..4db434f65a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -102,7 +102,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private final SparkWriteRequirements writeRequirements;
   private final Map<String, String> writeProperties;
 
-  private boolean cleanupOnAbort = true;
+  private boolean cleanupOnAbort = false;
 
   SparkWrite(
       SparkSession spark,
@@ -222,9 +222,9 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       operation.commit(); // abort is automatically called if this fails
       long duration = System.currentTimeMillis() - start;
       LOG.info("Committed in {} ms", duration);
-    } catch (CommitStateUnknownException commitStateUnknownException) {
-      cleanupOnAbort = false;
-      throw commitStateUnknownException;
+    } catch (Exception e) {
+      cleanupOnAbort = e instanceof CleanableFailure;
+      throw e;
     }
   }
 

Reply via email to