This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e57b9f6954 Spark: Backport #10373 to Spark 3.3 and 3.4 (#10546)
e57b9f6954 is described below
commit e57b9f69543886236297db43923a08994f9e18c4
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Jun 21 07:33:26 2024 -0700
Spark: Backport #10373 to Spark 3.3 and 3.4 (#10546)
---
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 8 ++++++--
.../apache/iceberg/spark/source/SparkPositionDeltaWrite.java | 10 +++++-----
.../main/java/org/apache/iceberg/spark/source/SparkWrite.java | 10 +++++-----
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 8 ++++++--
.../apache/iceberg/spark/source/SparkPositionDeltaWrite.java | 10 +++++-----
.../main/java/org/apache/iceberg/spark/source/SparkWrite.java | 10 +++++-----
6 files changed, 32 insertions(+), 24 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 288b2d417f..78eb010b9b 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
@@ -345,8 +346,11 @@ public class RewriteManifestsSparkAction
// don't clean up added manifest files, because they may have been
successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
- // delete all new manifests because the rewrite failed
- deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ if (e instanceof CleanableFailure) {
+ // delete all new manifests because the rewrite failed
+ deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ }
+
throw e;
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 2d38292439..8e5aeef6e1 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -43,7 +43,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -102,7 +102,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkPositionDeltaWrite(
SparkSession spark,
@@ -284,9 +284,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index c26b7f5f38..18db2ee2c7 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -101,7 +101,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final Distribution requiredDistribution;
private final SortOrder[] requiredOrdering;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkWrite(
SparkSession spark,
@@ -216,9 +216,9 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 8ec3b44f92..e9edfeb985 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ImmutableRewriteManifests;
import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.OutputFile;
@@ -355,8 +356,11 @@ public class RewriteManifestsSparkAction
// don't clean up added manifest files, because they may have been
successfully committed.
throw commitStateUnknownException;
} catch (Exception e) {
- // delete all new manifests because the rewrite failed
- deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ if (e instanceof CleanableFailure) {
+ // delete all new manifests because the rewrite failed
+ deleteFiles(Iterables.transform(addedManifests, ManifestFile::path));
+ }
+
throw e;
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index c5fc8e0b0f..808764b31f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -44,7 +44,7 @@ import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.BasePositionDeltaWriter;
@@ -105,7 +105,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final Context context;
private final Map<String, String> writeProperties;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkPositionDeltaWrite(
SparkSession spark,
@@ -293,9 +293,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index d879a1f961..4db434f65a 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -40,7 +40,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -102,7 +102,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;
- private boolean cleanupOnAbort = true;
+ private boolean cleanupOnAbort = false;
SparkWrite(
SparkSession spark,
@@ -222,9 +222,9 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
operation.commit(); // abort is automatically called if this fails
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
- } catch (CommitStateUnknownException commitStateUnknownException) {
- cleanupOnAbort = false;
- throw commitStateUnknownException;
+ } catch (Exception e) {
+ cleanupOnAbort = e instanceof CleanableFailure;
+ throw e;
}
}