This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b5a31a14d5 Spark 3.2: Improve task and job abort handling (#6926)
b5a31a14d5 is described below
commit b5a31a14d56c1ee24bad87e1ac7f119d638ee320
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Feb 24 13:37:40 2023 -0800
Spark 3.2: Improve task and job abort handling (#6926)
---
.../iceberg/spark/extensions/TestWriteAborts.java | 193 +++++++++++++++++++++
.../iceberg/spark/source/SparkCleanupUtil.java | 138 +++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 40 +++--
.../apache/iceberg/spark/source/SparkWrite.java | 86 +++------
4 files changed, 374 insertions(+), 83 deletions(-)
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
new file mode 100644
index 0000000000..24a9b3f232
--- /dev/null
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.SparkException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class TestWriteAborts extends SparkExtensionsTestBase {
+
+ @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1},
config = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ "testhive",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ CatalogProperties.FILE_IO_IMPL,
+ CustomFileIO.class.getName(),
+ "default-namespace",
+ "default")
+ },
+ {
+ "testhivebulk",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ CatalogProperties.FILE_IO_IMPL,
+ CustomBulkFileIO.class.getName(),
+ "default-namespace",
+ "default")
+ }
+ };
+ }
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ public TestWriteAborts(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testBatchAppend() throws Exception {
+ String dataLocation = temp.newFolder().toString();
+
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data)"
+ + "TBLPROPERTIES ('%s' '%s')",
+ tableName, TableProperties.WRITE_DATA_LOCATION, dataLocation);
+
+ List<SimpleRecord> records =
+ ImmutableList.of(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "a"),
+ new SimpleRecord(4, "b"));
+ Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+
+ AssertHelpers.assertThrows(
+ "Write must fail",
+ SparkException.class,
+ "Writing job aborted",
+ () -> {
+ try {
+ // incoming records are not ordered by partitions so the job must
fail
+ inputDF
+ .coalesce(1)
+ .sortWithinPartitions("id")
+ .writeTo(tableName)
+ .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
+ .append();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertEquals("Should be no records", sql("SELECT * FROM %s", tableName),
ImmutableList.of());
+
+ assertEquals(
+ "Should be no orphan data files",
+ ImmutableList.of(),
+ sql(
+ "CALL %s.system.remove_orphan_files(table => '%s', older_than =>
%dL, location => '%s')",
+ catalogName, tableName, System.currentTimeMillis() + 5000,
dataLocation));
+ }
+
+ public static class CustomFileIO implements FileIO {
+
+ private final FileIO delegate = new HadoopFileIO(new Configuration());
+
+ public CustomFileIO() {}
+
+ protected FileIO delegate() {
+ return delegate;
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return delegate.newInputFile(path);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return delegate.newOutputFile(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ delegate.deleteFile(path);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return delegate.properties();
+ }
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ delegate.initialize(properties);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+ }
+
+ public static class CustomBulkFileIO extends CustomFileIO implements
SupportsBulkOperations {
+
+ public CustomBulkFileIO() {}
+
+ @Override
+ public void deleteFile(String path) {
+ throw new UnsupportedOperationException("Only bulk deletes are
supported");
+ }
+
+ @Override
+ public void deleteFiles(Iterable<String> paths) throws
BulkDeletionFailureException {
+ for (String path : paths) {
+ delegate().deleteFile(path);
+ }
+ }
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
new file mode 100644
index 0000000000..a103a50032
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A utility for cleaning up written but not committed files. */
+class SparkCleanupUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCleanupUtil.class);
+
+ private static final int DELETE_NUM_RETRIES = 3;
+ private static final int DELETE_MIN_RETRY_WAIT_MS = 100; // 100 ms
+ private static final int DELETE_MAX_RETRY_WAIT_MS = 30 * 1000; // 30 seconds
+ private static final int DELETE_TOTAL_RETRY_TIME_MS = 2 * 60 * 1000; // 2
minutes
+
+ private SparkCleanupUtil() {}
+
+ /**
+ * Attempts to delete as many files produced by a task as possible.
+ *
+ * <p>Note this method will log Spark task info and is supposed to be called
only on executors.
+ * Use {@link #deleteFiles(String, FileIO, List)} to delete files on the
driver.
+ *
+ * @param io a {@link FileIO} instance used for deleting files
+ * @param files a list of files to delete
+ */
+ public static void deleteTaskFiles(FileIO io, List<? extends ContentFile<?>>
files) {
+ deleteFiles(taskInfo(), io, files);
+ }
+
+ // the format matches what Spark uses for internal logging
+ private static String taskInfo() {
+ TaskContext taskContext = TaskContext.get();
+ if (taskContext == null) {
+ return "unknown task";
+ } else {
+ return String.format(
+ "partition %d (task %d, attempt %d, stage %d.%d)",
+ taskContext.partitionId(),
+ taskContext.taskAttemptId(),
+ taskContext.attemptNumber(),
+ taskContext.stageId(),
+ taskContext.stageAttemptNumber());
+ }
+ }
+
+ /**
+ * Attempts to delete as many given files as possible.
+ *
+ * @param context a helpful description of the operation invoking this method
+ * @param io a {@link FileIO} instance used for deleting files
+ * @param files a list of files to delete
+ */
+ public static void deleteFiles(String context, FileIO io, List<? extends
ContentFile<?>> files) {
+ List<String> paths = Lists.transform(files, file ->
file.path().toString());
+ deletePaths(context, io, paths);
+ }
+
+ private static void deletePaths(String context, FileIO io, List<String>
paths) {
+ if (io instanceof SupportsBulkOperations) {
+ SupportsBulkOperations bulkIO = (SupportsBulkOperations) io;
+ bulkDelete(context, bulkIO, paths);
+ } else {
+ delete(context, io, paths);
+ }
+ }
+
+ private static void bulkDelete(String context, SupportsBulkOperations io,
List<String> paths) {
+ try {
+ io.deleteFiles(paths);
+ LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(),
context);
+
+ } catch (BulkDeletionFailureException e) {
+ int deletedFilesCount = paths.size() - e.numberFailedObjects();
+ LOG.warn(
+ "Deleted only {} of {} file(s) using bulk deletes ({})",
+ deletedFilesCount,
+ paths.size(),
+ context);
+ }
+ }
+
+ private static void delete(String context, FileIO io, List<String> paths) {
+ AtomicInteger deletedFilesCount = new AtomicInteger(0);
+
+ Tasks.foreach(paths)
+ .executeWith(ThreadPools.getWorkerPool())
+ .stopRetryOn(NotFoundException.class)
+ .suppressFailureWhenFinished()
+ .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path,
context, exc))
+ .retry(DELETE_NUM_RETRIES)
+ .exponentialBackoff(
+ DELETE_MIN_RETRY_WAIT_MS,
+ DELETE_MAX_RETRY_WAIT_MS,
+ DELETE_TOTAL_RETRY_TIME_MS,
+ 2 /* exponential */)
+ .run(
+ path -> {
+ io.deleteFile(path);
+ deletedFilesCount.incrementAndGet();
+ });
+
+ if (deletedFilesCount.get() < paths.size()) {
+ LOG.warn("Deleted only {} of {} file(s) ({})", deletedFilesCount,
paths.size(), context);
+ } else {
+ LOG.info("Deleted {} file(s) ({})", paths.size(), context);
+ }
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 1f65184925..00048c470b 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -26,6 +26,7 @@ import static
org.apache.spark.sql.connector.iceberg.write.RowLevelOperation.Com
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -56,6 +57,7 @@ import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.PositionDeltaWriter;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -63,8 +65,6 @@ import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructProjection;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
@@ -143,14 +143,6 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return new PositionDeltaBatchWrite();
}
- private static <T extends ContentFile<T>> void cleanFiles(FileIO io,
Iterable<T> files) {
- Tasks.foreach(files)
- .executeWith(ThreadPools.getWorkerPool())
- .throwFailureWhenFinished()
- .noRetry()
- .run(file -> io.deleteFile(file.path().toString()));
- }
-
private class PositionDeltaBatchWrite implements DeltaBatchWrite {
@Override
@@ -243,17 +235,25 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
@Override
public void abort(WriterCommitMessage[] messages) {
- if (!cleanupOnAbort) {
- return;
+ if (cleanupOnAbort) {
+ SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+ } else {
+ LOG.warn("Skipping cleanup of written files");
}
+ }
+
+ private List<ContentFile<?>> files(WriterCommitMessage[] messages) {
+ List<ContentFile<?>> files = Lists.newArrayList();
for (WriterCommitMessage message : messages) {
if (message != null) {
DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
- cleanFiles(table.io(), Arrays.asList(taskCommit.dataFiles()));
- cleanFiles(table.io(), Arrays.asList(taskCommit.deleteFiles()));
+ files.addAll(Arrays.asList(taskCommit.dataFiles()));
+ files.addAll(Arrays.asList(taskCommit.deleteFiles()));
}
}
+
+ return files;
}
private void commitOperation(SnapshotUpdate<?> operation, String
description) {
@@ -464,7 +464,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
close();
DeleteWriteResult result = delegate.result();
- cleanFiles(io, result.deleteFiles());
+ SparkCleanupUtil.deleteTaskFiles(io, result.deleteFiles());
}
@Override
@@ -543,8 +543,14 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
close();
WriteResult result = delegate.result();
- cleanFiles(io, Arrays.asList(result.dataFiles()));
- cleanFiles(io, Arrays.asList(result.deleteFiles()));
+ SparkCleanupUtil.deleteTaskFiles(io, files(result));
+ }
+
+ private List<ContentFile<?>> files(WriteResult result) {
+ List<ContentFile<?>> files = Lists.newArrayList();
+ files.addAll(Arrays.asList(result.dataFiles()));
+ files.addAll(Arrays.asList(result.deleteFiles()));
+ return files;
}
@Override
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index e84442a00a..f63db416cc 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -20,24 +20,13 @@ package org.apache.iceberg.spark.source;
import static org.apache.iceberg.IsolationLevel.SERIALIZABLE;
import static org.apache.iceberg.IsolationLevel.SNAPSHOT;
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
-import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
-import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
-import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
@@ -62,15 +51,11 @@ import org.apache.iceberg.io.FileWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaSparkContext;
@@ -229,40 +214,23 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private void abort(WriterCommitMessage[] messages) {
if (cleanupOnAbort) {
- Map<String, String> props = table.properties();
- Tasks.foreach(files(messages))
- .executeWith(ThreadPools.getWorkerPool())
- .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- PropertyUtil.propertyAsInt(
- props, COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(
- props, COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- PropertyUtil.propertyAsInt(
- props, COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(
- file -> {
- table.io().deleteFile(file.path().toString());
- });
+ SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
} else {
- LOG.warn(
- "Skipping cleaning up of data files because Iceberg was unable to
determine the final commit state");
+ LOG.warn("Skipping cleanup of written files");
}
}
- private Iterable<DataFile> files(WriterCommitMessage[] messages) {
- if (messages.length > 0) {
- return Iterables.concat(
- Iterables.transform(
- Arrays.asList(messages),
- message ->
- message != null
- ? ImmutableList.copyOf(((TaskCommit) message).files())
- : ImmutableList.of()));
- }
- return ImmutableList.of();
+ private List<DataFile> files(WriterCommitMessage[] messages) {
+ List<DataFile> files = Lists.newArrayList();
+
+ for (WriterCommitMessage message : messages) {
+ if (message != null) {
+ TaskCommit taskCommit = (TaskCommit) message;
+ files.addAll(Arrays.asList(taskCommit.files()));
+ }
+ }
+
+ return files;
}
@Override
@@ -305,9 +273,9 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private class DynamicOverwrite extends BaseBatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
- Iterable<DataFile> files = files(messages);
+ List<DataFile> files = files(messages);
- if (!files.iterator().hasNext()) {
+ if (files.isEmpty()) {
LOG.info("Dynamic overwrite is empty, skipping commit");
return;
}
@@ -478,13 +446,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
@Override
public void commit(WriterCommitMessage[] messages) {
FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
-
- Set<DataFile> newDataFiles =
Sets.newHashSetWithExpectedSize(messages.length);
- for (DataFile file : files(messages)) {
- newDataFiles.add(file);
- }
-
- coordinator.stageRewrite(table, fileSetID,
Collections.unmodifiableSet(newDataFiles));
+ coordinator.stageRewrite(table, fileSetID,
ImmutableSet.copyOf(files(messages)));
}
}
@@ -685,14 +647,6 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
}
}
- private static <T extends ContentFile<T>> void deleteFiles(FileIO io,
List<T> files) {
- Tasks.foreach(files)
- .executeWith(ThreadPools.getWorkerPool())
- .throwFailureWhenFinished()
- .noRetry()
- .run(file -> io.deleteFile(file.path().toString()));
- }
-
private static class UnpartitionedDataWriter implements
DataWriter<InternalRow> {
private final FileWriter<InternalRow, DataWriteResult> delegate;
private final FileIO io;
@@ -728,7 +682,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
close();
DataWriteResult result = delegate.result();
- deleteFiles(io, result.dataFiles());
+ SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles());
}
@Override
@@ -785,7 +739,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
close();
DataWriteResult result = delegate.result();
- deleteFiles(io, result.dataFiles());
+ SparkCleanupUtil.deleteTaskFiles(io, result.dataFiles());
}
@Override