This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3efaee1790 Spark 3.3: Improve task and job abort handling (#6876)
3efaee1790 is described below

commit 3efaee1790fe541d0969e12fa24afa4e1a3041c3
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 23 13:55:06 2023 -0800

    Spark 3.3: Improve task and job abort handling (#6876)
---
 .../iceberg/spark/extensions/TestWriteAborts.java  | 193 +++++++++++++++++++++
 .../iceberg/spark/source/SparkCleanupUtil.java     | 138 +++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  40 +++--
 .../apache/iceberg/spark/source/SparkWrite.java    |  85 +++------
 4 files changed, 374 insertions(+), 82 deletions(-)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
new file mode 100644
index 0000000000..24a9b3f232
--- /dev/null
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestWriteAborts extends SparkExtensionsTestBase {
+
+  @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, 
config = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        "testhive",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            CatalogProperties.FILE_IO_IMPL,
+            CustomFileIO.class.getName(),
+            "default-namespace",
+            "default")
+      },
+      {
+        "testhivebulk",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            CatalogProperties.FILE_IO_IMPL,
+            CustomBulkFileIO.class.getName(),
+            "default-namespace",
+            "default")
+      }
+    };
+  }
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  public TestWriteAborts(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testBatchAppend() throws Exception {
+    String dataLocation = temp.newFolder().toString();
+
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data)"
+            + "TBLPROPERTIES ('%s' '%s')",
+        tableName, TableProperties.WRITE_DATA_LOCATION, dataLocation);
+
+    List<SimpleRecord> records =
+        ImmutableList.of(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "a"),
+            new SimpleRecord(4, "b"));
+    Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+
+    AssertHelpers.assertThrows(
+        "Write must fail",
+        SparkException.class,
+        "Writing job aborted",
+        () -> {
+          try {
+            // incoming records are not ordered by partitions so the job must 
fail
+            inputDF
+                .coalesce(1)
+                .sortWithinPartitions("id")
+                .writeTo(tableName)
+                .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
+                .append();
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    assertEquals("Should be no records", sql("SELECT * FROM %s", tableName), 
ImmutableList.of());
+
+    assertEquals(
+        "Should be no orphan data files",
+        ImmutableList.of(),
+        sql(
+            "CALL %s.system.remove_orphan_files(table => '%s', older_than => 
%dL, location => '%s')",
+            catalogName, tableName, System.currentTimeMillis() + 5000, 
dataLocation));
+  }
+
+  public static class CustomFileIO implements FileIO {
+
+    private final FileIO delegate = new HadoopFileIO(new Configuration());
+
+    public CustomFileIO() {}
+
+    protected FileIO delegate() {
+      return delegate;
+    }
+
+    @Override
+    public InputFile newInputFile(String path) {
+      return delegate.newInputFile(path);
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      return delegate.newOutputFile(path);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      delegate.deleteFile(path);
+    }
+
+    @Override
+    public Map<String, String> properties() {
+      return delegate.properties();
+    }
+
+    @Override
+    public void initialize(Map<String, String> properties) {
+      delegate.initialize(properties);
+    }
+
+    @Override
+    public void close() {
+      delegate.close();
+    }
+  }
+
+  public static class CustomBulkFileIO extends CustomFileIO implements 
SupportsBulkOperations {
+
+    public CustomBulkFileIO() {}
+
+    @Override
+    public void deleteFile(String path) {
+      throw new UnsupportedOperationException("Only bulk deletes are 
supported");
+    }
+
+    @Override
+    public void deleteFiles(Iterable<String> paths) throws 
BulkDeletionFailureException {
+      for (String path : paths) {
+        delegate().deleteFile(path);
+      }
+    }
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
new file mode 100644
index 0000000000..a103a50032
--- /dev/null
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A utility for cleaning up written but not committed files. */
+class SparkCleanupUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkCleanupUtil.class);
+
+  private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_MIN_RETRY_WAIT_MS = 100; // 100 ms
+  private static final int DELETE_MAX_RETRY_WAIT_MS = 30 * 1000; // 30 seconds
+  private static final int DELETE_TOTAL_RETRY_TIME_MS = 2 * 60 * 1000; // 2 
minutes
+
+  private SparkCleanupUtil() {}
+
+  /**
+   * Attempts to delete as many files produced by a task as possible.
+   *
+   * <p>Note this method will log Spark task info and is supposed to be called 
only on executors.
+   * Use {@link #deleteFiles(String, FileIO, List)} to delete files on the 
driver.
+   *
+   * @param io a {@link FileIO} instance used for deleting files
+   * @param files a list of files to delete
+   */
+  public static void deleteTaskFiles(FileIO io, List<? extends ContentFile<?>> 
files) {
+    deleteFiles(taskInfo(), io, files);
+  }
+
+  // the format matches what Spark uses for internal logging
+  private static String taskInfo() {
+    TaskContext taskContext = TaskContext.get();
+    if (taskContext == null) {
+      return "unknown task";
+    } else {
+      return String.format(
+          "partition %d (task %d, attempt %d, stage %d.%d)",
+          taskContext.partitionId(),
+          taskContext.taskAttemptId(),
+          taskContext.attemptNumber(),
+          taskContext.stageId(),
+          taskContext.stageAttemptNumber());
+    }
+  }
+
+  /**
+   * Attempts to delete as many given files as possible.
+   *
+   * @param context a helpful description of the operation invoking this method
+   * @param io a {@link FileIO} instance used for deleting files
+   * @param files a list of files to delete
+   */
+  public static void deleteFiles(String context, FileIO io, List<? extends 
ContentFile<?>> files) {
+    List<String> paths = Lists.transform(files, file -> 
file.path().toString());
+    deletePaths(context, io, paths);
+  }
+
+  private static void deletePaths(String context, FileIO io, List<String> 
paths) {
+    if (io instanceof SupportsBulkOperations) {
+      SupportsBulkOperations bulkIO = (SupportsBulkOperations) io;
+      bulkDelete(context, bulkIO, paths);
+    } else {
+      delete(context, io, paths);
+    }
+  }
+
+  private static void bulkDelete(String context, SupportsBulkOperations io, 
List<String> paths) {
+    try {
+      io.deleteFiles(paths);
+      LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(), 
context);
+
+    } catch (BulkDeletionFailureException e) {
+      int deletedFilesCount = paths.size() - e.numberFailedObjects();
+      LOG.warn(
+          "Deleted only {} of {} file(s) using bulk deletes ({})",
+          deletedFilesCount,
+          paths.size(),
+          context);
+    }
+  }
+
+  private static void delete(String context, FileIO io, List<String> paths) {
+    AtomicInteger deletedFilesCount = new AtomicInteger(0);
+
+    Tasks.foreach(paths)
+        .executeWith(ThreadPools.getWorkerPool())
+        .stopRetryOn(NotFoundException.class)
+        .suppressFailureWhenFinished()
+        .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, 
context, exc))
+        .retry(DELETE_NUM_RETRIES)
+        .exponentialBackoff(
+            DELETE_MIN_RETRY_WAIT_MS,
+            DELETE_MAX_RETRY_WAIT_MS,
+            DELETE_TOTAL_RETRY_TIME_MS,
+            2 /* exponential */)
+        .run(
+            path -> {
+              io.deleteFile(path);
+              deletedFilesCount.incrementAndGet();
+            });
+
+    if (deletedFilesCount.get() < paths.size()) {
+      LOG.warn("Deleted only {} of {} file(s) ({})", deletedFilesCount, 
paths.size(), context);
+    } else {
+      LOG.info("Deleted {} file(s) ({})", paths.size(), context);
+    }
+  }
+}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index c3b5ff7353..5eba7166c9 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -26,6 +26,7 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -56,6 +57,7 @@ import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.io.PositionDeltaWriter;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -63,8 +65,6 @@ import org.apache.iceberg.spark.SparkWriteConf;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.StructProjection;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
@@ -143,14 +143,6 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     return new PositionDeltaBatchWrite();
   }
 
-  private static <T extends ContentFile<T>> void cleanFiles(FileIO io, 
Iterable<T> files) {
-    Tasks.foreach(files)
-        .executeWith(ThreadPools.getWorkerPool())
-        .throwFailureWhenFinished()
-        .noRetry()
-        .run(file -> io.deleteFile(file.path().toString()));
-  }
-
   private class PositionDeltaBatchWrite implements DeltaBatchWrite {
 
     @Override
@@ -241,17 +233,25 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
     @Override
     public void abort(WriterCommitMessage[] messages) {
-      if (!cleanupOnAbort) {
-        return;
+      if (cleanupOnAbort) {
+        SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+      } else {
+        LOG.warn("Skipping cleanup of written files");
       }
+    }
+
+    private List<ContentFile<?>> files(WriterCommitMessage[] messages) {
+      List<ContentFile<?>> files = Lists.newArrayList();
 
       for (WriterCommitMessage message : messages) {
         if (message != null) {
           DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
-          cleanFiles(table.io(), Arrays.asList(taskCommit.dataFiles()));
-          cleanFiles(table.io(), Arrays.asList(taskCommit.deleteFiles()));
+          files.addAll(Arrays.asList(taskCommit.dataFiles()));
+          files.addAll(Arrays.asList(taskCommit.deleteFiles()));
         }
       }
+
+      return files;
     }
 
     private void commitOperation(SnapshotUpdate<?> operation, String 
description) {
@@ -462,7 +462,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       close();
 
       DeleteWriteResult result = delegate.result();
-      cleanFiles(io, result.deleteFiles());
+      SparkCleanupUtil.deleteTaskFiles(io, result.deleteFiles());
     }
 
     @Override
@@ -541,8 +541,14 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       close();
 
       WriteResult result = delegate.result();
-      cleanFiles(io, Arrays.asList(result.dataFiles()));
-      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+      SparkCleanupUtil.deleteTaskFiles(io, files(result));
+    }
+
+    private List<ContentFile<?>> files(WriteResult result) {
+      List<ContentFile<?>> files = Lists.newArrayList();
+      files.addAll(Arrays.asList(result.dataFiles()));
+      files.addAll(Arrays.asList(result.deleteFiles()));
+      return files;
     }
 
     @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 9ae7280ea8..f68898e27b 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -20,24 +20,13 @@ package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.IsolationLevel.SERIALIZABLE;
 import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
-import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
-import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
-import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
@@ -63,14 +52,11 @@ import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.io.RollingDataWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.SparkWriteConf;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -229,40 +215,23 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
 
   private void abort(WriterCommitMessage[] messages) {
     if (cleanupOnAbort) {
-      Map<String, String> props = table.properties();
-      Tasks.foreach(files(messages))
-          .executeWith(ThreadPools.getWorkerPool())
-          .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
-          .exponentialBackoff(
-              PropertyUtil.propertyAsInt(
-                  props, COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-              PropertyUtil.propertyAsInt(
-                  props, COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-              PropertyUtil.propertyAsInt(
-                  props, COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-              2.0 /* exponential */)
-          .throwFailureWhenFinished()
-          .run(
-              file -> {
-                table.io().deleteFile(file.path().toString());
-              });
+      SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
     } else {
-      LOG.warn(
-          "Skipping cleaning up of data files because Iceberg was unable to 
determine the final commit state");
+      LOG.warn("Skipping cleanup of written files");
     }
   }
 
-  private Iterable<DataFile> files(WriterCommitMessage[] messages) {
-    if (messages.length > 0) {
-      return Iterables.concat(
-          Iterables.transform(
-              Arrays.asList(messages),
-              message ->
-                  message != null
-                      ? ImmutableList.copyOf(((TaskCommit) message).files())
-                      : ImmutableList.of()));
-    }
-    return ImmutableList.of();
+  private List<DataFile> files(WriterCommitMessage[] messages) {
+    List<DataFile> files = Lists.newArrayList();
+
+    for (WriterCommitMessage message : messages) {
+      if (message != null) {
+        TaskCommit taskCommit = (TaskCommit) message;
+        files.addAll(Arrays.asList(taskCommit.files()));
+      }
+    }
+
+    return files;
   }
 
   @Override
@@ -305,9 +274,9 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private class DynamicOverwrite extends BaseBatchWrite {
     @Override
     public void commit(WriterCommitMessage[] messages) {
-      Iterable<DataFile> files = files(messages);
+      List<DataFile> files = files(messages);
 
-      if (!files.iterator().hasNext()) {
+      if (files.isEmpty()) {
         LOG.info("Dynamic overwrite is empty, skipping commit");
         return;
       }
@@ -491,13 +460,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     @Override
     public void commit(WriterCommitMessage[] messages) {
       FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
-
-      Set<DataFile> newDataFiles = 
Sets.newHashSetWithExpectedSize(messages.length);
-      for (DataFile file : files(messages)) {
-        newDataFiles.add(file);
-      }
-
-      coordinator.stageRewrite(table, fileSetID, 
Collections.unmodifiableSet(newDataFiles));
+      coordinator.stageRewrite(table, fileSetID, 
ImmutableSet.copyOf(files(messages)));
     }
   }
 
@@ -698,14 +661,6 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     }
   }
 
-  private static <T extends ContentFile<T>> void deleteFiles(FileIO io, 
List<T> files) {
-    Tasks.foreach(files)
-        .executeWith(ThreadPools.getWorkerPool())
-        .throwFailureWhenFinished()
-        .noRetry()
-        .run(file -> io.deleteFile(file.path().toString()));
-  }
-
   private static class UnpartitionedDataWriter implements 
DataWriter<InternalRow> {
     private final FileWriter<InternalRow, DataWriteResult> delegate;
     private final FileIO io;
@@ -741,7 +696,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       close();
 
       DataWriteResult result = delegate.result();
-      deleteFiles(io, result.dataFiles());
+      SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles());
     }
 
     @Override
@@ -798,7 +753,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       close();
 
       DataWriteResult result = delegate.result();
-      deleteFiles(io, result.dataFiles());
+      SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles());
     }
 
     @Override

Reply via email to