This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3f158ff Spark: Remove unused FileRewriteCoordinator code (#2819)
3f158ff is described below
commit 3f158ff7096c95286a73037992a6b9a8319347c8
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Jul 15 12:45:24 2021 -0500
Spark: Remove unused FileRewriteCoordinator code (#2819)
Since we changed our implementation of Spark3BinPackStrategy, we no longer
need some
of the functionality that was previously in FileRewriteCoordinator. Here we
remove
those functions and related test code.
---
.../iceberg/spark/FileRewriteCoordinator.java | 100 ++++-----------------
.../spark/actions/Spark3BinPackStrategy.java | 3 +-
.../iceberg/spark/TestFileRewriteCoordinator.java | 24 ++++-
3 files changed, 38 insertions(+), 89 deletions(-)
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
index 02a272c..1fb2922 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
@@ -19,25 +19,17 @@
package org.apache.iceberg.spark;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,88 +47,36 @@ public class FileRewriteCoordinator {
return INSTANCE;
}
+ /**
+ * Called to persist the output of a rewrite action for a specific group.
Since the write is done via a
+ * Spark Datasource, we have to propagate the result through this
side-effect call.
+ * @param table table where the rewrite is occurring
+ * @param fileSetID the id used to identify the source set of files being
rewritten
+ * @param newDataFiles the new files which have been written
+ */
public void stageRewrite(Table table, String fileSetID, Set<DataFile>
newDataFiles) {
Preconditions.checkArgument(newDataFiles != null && newDataFiles.size() >
0, "Cannot stage null or empty file set");
+ LOG.debug("Staging the output for {} - fileset {} with {} files",
table.name(), fileSetID, newDataFiles.size());
Pair<String, String> id = toID(table, fileSetID);
resultMap.put(id, newDataFiles);
}
- public void commitRewrite(Table table, String fileSetID) {
- commitRewrite(table, ImmutableSet.of(fileSetID));
- }
-
- public void commitRewrite(Table table, Set<String> fileSetIDs) {
- Set<DataFile> rewrittenDataFiles = fetchRewrittenDataFiles(table,
fileSetIDs);
- Set<DataFile> newDataFiles = fetchNewDataFiles(table, fileSetIDs);
-
- table.newRewrite()
- .rewriteFiles(rewrittenDataFiles, newDataFiles)
- .commit();
-
- fileSetIDs.stream().map(id -> toID(table, id)).forEach(resultMap::remove);
- }
-
- private Set<DataFile> fetchRewrittenDataFiles(Table table, Set<String>
fileSetIDs) {
- FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
-
- Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
-
- for (String fileSetID : fileSetIDs) {
- List<FileScanTask> tasks = taskSetManager.fetchTasks(table, fileSetID);
- ValidationException.check(tasks != null,
- "Task set manager has no tasks for table %s with id %s",
- table, fileSetID);
-
- for (FileScanTask task : tasks) {
- DataFile dataFile = task.file();
- rewrittenDataFiles.add(dataFile);
- }
- }
-
- return Collections.unmodifiableSet(rewrittenDataFiles);
- }
-
- public Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) {
- List<Set<DataFile>> results = Lists.newArrayList();
-
- for (String fileSetID : fileSetIDs) {
- Pair<String, String> id = toID(table, fileSetID);
- Set<DataFile> result = resultMap.get(id);
- ValidationException.check(result != null,
- "No results for rewrite of file set %s in table %s",
- fileSetID, table);
-
- results.add(result);
- }
-
- Set<DataFile> newDataFiles = results.get(0);
- for (int index = 1; index < results.size(); index++) {
- newDataFiles = Sets.union(newDataFiles, results.get(index));
- }
+ public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
+ Pair<String, String> id = toID(table, fileSetID);
+ Set<DataFile> result = resultMap.get(id);
+ ValidationException.check(result != null,
+ "No results for rewrite of file set %s in table %s",
+ fileSetID, table);
- return newDataFiles;
+ return result;
}
public void clearRewrite(Table table, String fileSetID) {
+ LOG.debug("Removing entry from RewriteCoordinator for {} - id {}",
table.name(), fileSetID);
Pair<String, String> id = toID(table, fileSetID);
resultMap.remove(id);
}
- public void abortRewrite(Table table, String fileSetID) {
- Pair<String, String> id = toID(table, fileSetID);
- Set<DataFile> dataFiles = resultMap.remove(id);
- if (dataFiles != null) {
- LOG.info("Deleting {} uncommitted data files for rewriting file set {}",
dataFiles.size(), fileSetID);
- deleteFiles(table.io(), dataFiles);
- }
- }
-
- public void abortRewrite(Table table, Set<String> fileSetIDs) {
- for (String fileSetID : fileSetIDs) {
- abortRewrite(table, fileSetID);
- }
- }
-
public Set<String> fetchSetIDs(Table table) {
return resultMap.keySet().stream()
.filter(e -> e.first().equals(tableUUID(table)))
@@ -144,14 +84,6 @@ public class FileRewriteCoordinator {
.collect(Collectors.toSet());
}
- private void deleteFiles(FileIO io, Iterable<DataFile> dataFiles) {
- Tasks.foreach(dataFiles)
- .noRetry()
- .suppressFailureWhenFinished()
- .onFailure((dataFile, exc) -> LOG.warn("Failed to delete: {}",
dataFile.path(), exc))
- .run(dataFile -> io.deleteFile(dataFile.path().toString()));
- }
-
private Pair<String, String> toID(Table table, String setID) {
String tableUUID = tableUUID(table);
return Pair.of(tableUUID, setID);
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
b/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
index b8e5079..b2995ce 100644
---
a/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
+++
b/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
@@ -26,7 +26,6 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -76,7 +75,7 @@ public class Spark3BinPackStrategy extends BinPackStrategy {
.mode("append")
.save(table.name());
- return rewriteCoordinator.fetchNewDataFiles(table,
ImmutableSet.of(groupID));
+ return rewriteCoordinator.fetchNewDataFiles(table, groupID);
} finally {
manager.removeTasks(table, groupID);
rewriteCoordinator.clearRewrite(table, groupID);
diff --git
a/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
b/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 1f02396..8aa5cd6 100644
---
a/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++
b/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -22,7 +22,10 @@ package org.apache.iceberg.spark;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
@@ -87,7 +90,11 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
// commit the rewrite
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
- rewriteCoordinator.commitRewrite(table, fileSetID);
+ Set<DataFile> rewrittenFiles = taskSetManager.fetchTasks(table,
fileSetID).stream()
+ .map(FileScanTask::file)
+ .collect(Collectors.toSet());
+ Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table,
fileSetID);
+ table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
table.refresh();
@@ -145,7 +152,11 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
// commit the rewrite
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
- rewriteCoordinator.commitRewrite(table, fileSetID);
+ Set<DataFile> rewrittenFiles = taskSetManager.fetchTasks(table,
fileSetID).stream()
+ .map(FileScanTask::file)
+ .collect(Collectors.toSet());
+ Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table,
fileSetID);
+ table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
table.refresh();
@@ -210,7 +221,14 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
// commit both rewrites at the same time
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
- rewriteCoordinator.commitRewrite(table, fileSetIDs);
+ Set<DataFile> rewrittenFiles = fileSetIDs.stream().flatMap(fileSetID ->
+ taskSetManager.fetchTasks(table, fileSetID).stream())
+ .map(FileScanTask::file)
+ .collect(Collectors.toSet());
+ Set<DataFile> addedFiles = fileSetIDs.stream()
+ .flatMap(fileSetID -> rewriteCoordinator.fetchNewDataFiles(table,
fileSetID).stream())
+ .collect(Collectors.toSet());
+ table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
table.refresh();