This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f158ff  Spark: Remove unused FileRewriteCoordinator code (#2819)
3f158ff is described below

commit 3f158ff7096c95286a73037992a6b9a8319347c8
Author: Russell Spitzer <[email protected]>
AuthorDate: Thu Jul 15 12:45:24 2021 -0500

    Spark: Remove unused FileRewriteCoordinator code (#2819)
    
    Since we changed our implementation of Spark3BinPackStrategy, we no longer 
need some
    of the functionality that was previously in FileRewriteCoordinator. Here we 
remove
    those functions and related test code.
---
 .../iceberg/spark/FileRewriteCoordinator.java      | 100 ++++-----------------
 .../spark/actions/Spark3BinPackStrategy.java       |   3 +-
 .../iceberg/spark/TestFileRewriteCoordinator.java  |  24 ++++-
 3 files changed, 38 insertions(+), 89 deletions(-)

diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java 
b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
index 02a272c..1fb2922 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
@@ -19,25 +19,17 @@
 
 package org.apache.iceberg.spark;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,88 +47,36 @@ public class FileRewriteCoordinator {
     return INSTANCE;
   }
 
+  /**
+   * Called to persist the output of a rewrite action for a specific group. 
Since the write is done via a
+   * Spark Datasource, we have to propagate the result through this 
side-effect call.
+   * @param table table where the rewrite is occurring
+   * @param fileSetID the id used to identify the source set of files being 
rewritten
+   * @param newDataFiles the new files which have been written
+   */
   public void stageRewrite(Table table, String fileSetID, Set<DataFile> 
newDataFiles) {
     Preconditions.checkArgument(newDataFiles != null && newDataFiles.size() > 
0, "Cannot stage null or empty file set");
+    LOG.debug("Staging the output for {} - fileset {} with {} files", 
table.name(), fileSetID, newDataFiles.size());
     Pair<String, String> id = toID(table, fileSetID);
     resultMap.put(id, newDataFiles);
   }
 
-  public void commitRewrite(Table table, String fileSetID) {
-    commitRewrite(table, ImmutableSet.of(fileSetID));
-  }
-
-  public void commitRewrite(Table table, Set<String> fileSetIDs) {
-    Set<DataFile> rewrittenDataFiles = fetchRewrittenDataFiles(table, 
fileSetIDs);
-    Set<DataFile> newDataFiles = fetchNewDataFiles(table, fileSetIDs);
-
-    table.newRewrite()
-        .rewriteFiles(rewrittenDataFiles, newDataFiles)
-        .commit();
-
-    fileSetIDs.stream().map(id -> toID(table, id)).forEach(resultMap::remove);
-  }
-
-  private Set<DataFile> fetchRewrittenDataFiles(Table table, Set<String> 
fileSetIDs) {
-    FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
-
-    Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
-
-    for (String fileSetID : fileSetIDs) {
-      List<FileScanTask> tasks = taskSetManager.fetchTasks(table, fileSetID);
-      ValidationException.check(tasks != null,
-          "Task set manager has no tasks for table %s with id %s",
-          table, fileSetID);
-
-      for (FileScanTask task : tasks) {
-        DataFile dataFile = task.file();
-        rewrittenDataFiles.add(dataFile);
-      }
-    }
-
-    return Collections.unmodifiableSet(rewrittenDataFiles);
-  }
-
-  public Set<DataFile> fetchNewDataFiles(Table table, Set<String> fileSetIDs) {
-    List<Set<DataFile>> results = Lists.newArrayList();
-
-    for (String fileSetID : fileSetIDs) {
-      Pair<String, String> id = toID(table, fileSetID);
-      Set<DataFile> result = resultMap.get(id);
-      ValidationException.check(result != null,
-          "No results for rewrite of file set %s in table %s",
-          fileSetID, table);
-
-      results.add(result);
-    }
-
-    Set<DataFile> newDataFiles = results.get(0);
-    for (int index = 1; index < results.size(); index++) {
-      newDataFiles = Sets.union(newDataFiles, results.get(index));
-    }
+  public Set<DataFile> fetchNewDataFiles(Table table, String fileSetID) {
+    Pair<String, String> id = toID(table, fileSetID);
+    Set<DataFile> result = resultMap.get(id);
+    ValidationException.check(result != null,
+        "No results for rewrite of file set %s in table %s",
+        fileSetID, table);
 
-    return newDataFiles;
+    return result;
   }
 
   public void clearRewrite(Table table, String fileSetID) {
+    LOG.debug("Removing entry from RewriteCoordinator for {} - id {}", 
table.name(), fileSetID);
     Pair<String, String> id = toID(table, fileSetID);
     resultMap.remove(id);
   }
 
-  public void abortRewrite(Table table, String fileSetID) {
-    Pair<String, String> id = toID(table, fileSetID);
-    Set<DataFile> dataFiles = resultMap.remove(id);
-    if (dataFiles != null) {
-      LOG.info("Deleting {} uncommitted data files for rewriting file set {}", 
dataFiles.size(), fileSetID);
-      deleteFiles(table.io(), dataFiles);
-    }
-  }
-
-  public void abortRewrite(Table table, Set<String> fileSetIDs) {
-    for (String fileSetID : fileSetIDs) {
-      abortRewrite(table, fileSetID);
-    }
-  }
-
   public Set<String> fetchSetIDs(Table table) {
     return resultMap.keySet().stream()
         .filter(e -> e.first().equals(tableUUID(table)))
@@ -144,14 +84,6 @@ public class FileRewriteCoordinator {
         .collect(Collectors.toSet());
   }
 
-  private void deleteFiles(FileIO io, Iterable<DataFile> dataFiles) {
-    Tasks.foreach(dataFiles)
-        .noRetry()
-        .suppressFailureWhenFinished()
-        .onFailure((dataFile, exc) -> LOG.warn("Failed to delete: {}", 
dataFile.path(), exc))
-        .run(dataFile -> io.deleteFile(dataFile.path().toString()));
-  }
-
   private Pair<String, String> toID(Table table, String setID) {
     String tableUUID = tableUUID(table);
     return Pair.of(tableUUID, setID);
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
 
b/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
index b8e5079..b2995ce 100644
--- 
a/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
@@ -26,7 +26,6 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.BinPackStrategy;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.FileScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -76,7 +75,7 @@ public class Spark3BinPackStrategy extends BinPackStrategy {
           .mode("append")
           .save(table.name());
 
-      return rewriteCoordinator.fetchNewDataFiles(table, 
ImmutableSet.of(groupID));
+      return rewriteCoordinator.fetchNewDataFiles(table, groupID);
     } finally {
       manager.removeTasks(table, groupID);
       rewriteCoordinator.clearRewrite(table, groupID);
diff --git 
a/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java 
b/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 1f02396..8aa5cd6 100644
--- 
a/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++ 
b/spark3/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -22,7 +22,10 @@ package org.apache.iceberg.spark;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
@@ -87,7 +90,11 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
 
       // commit the rewrite
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
-      rewriteCoordinator.commitRewrite(table, fileSetID);
+      Set<DataFile> rewrittenFiles = taskSetManager.fetchTasks(table, 
fileSetID).stream()
+          .map(FileScanTask::file)
+          .collect(Collectors.toSet());
+      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table, 
fileSetID);
+      table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
     }
 
     table.refresh();
@@ -145,7 +152,11 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
 
       // commit the rewrite
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
-      rewriteCoordinator.commitRewrite(table, fileSetID);
+      Set<DataFile> rewrittenFiles = taskSetManager.fetchTasks(table, 
fileSetID).stream()
+          .map(FileScanTask::file)
+          .collect(Collectors.toSet());
+      Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table, 
fileSetID);
+      table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
     }
 
     table.refresh();
@@ -210,7 +221,14 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
 
     // commit both rewrites at the same time
     FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
-    rewriteCoordinator.commitRewrite(table, fileSetIDs);
+    Set<DataFile> rewrittenFiles = fileSetIDs.stream().flatMap(fileSetID ->
+        taskSetManager.fetchTasks(table, fileSetID).stream())
+        .map(FileScanTask::file)
+        .collect(Collectors.toSet());
+    Set<DataFile> addedFiles = fileSetIDs.stream()
+        .flatMap(fileSetID -> rewriteCoordinator.fetchNewDataFiles(table, 
fileSetID).stream())
+        .collect(Collectors.toSet());
+    table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
 
     table.refresh();
 

Reply via email to