This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f4d830b0fa Core: Extend ResolvingFileIO to support BulkOperations 
(#7976)
f4d830b0fa is described below

commit f4d830b0faba6e602932cf81ec03206b19e371bd
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Aug 14 00:09:41 2023 -0700

    Core: Extend ResolvingFileIO to support BulkOperations (#7976)
---
 .../org/apache/iceberg/io/ResolvingFileIO.java     | 38 ++++++++++++-
 .../org/apache/iceberg/io/TestResolvingIO.java     | 66 +++++++++++++++++++++-
 2 files changed, 100 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 22f17ec4db..58c0586b7e 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -23,23 +23,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopConfigurable;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** FileIO implementation that uses location scheme to choose the correct 
FileIO implementation. */
-public class ResolvingFileIO implements FileIO, HadoopConfigurable {
+public class ResolvingFileIO implements FileIO, HadoopConfigurable, 
SupportsBulkOperations {
   private static final Logger LOG = 
LoggerFactory.getLogger(ResolvingFileIO.class);
+  private static final int BATCH_SIZE = 100_000;
   private static final String FALLBACK_IMPL = 
"org.apache.iceberg.hadoop.HadoopFileIO";
   private static final String S3_FILE_IO_IMPL = 
"org.apache.iceberg.aws.s3.S3FileIO";
   private static final String GCS_FILE_IO_IMPL = 
"org.apache.iceberg.gcp.gcs.GCSFileIO";
@@ -85,6 +90,34 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
     io(location).deleteFile(location);
   }
 
+  @Override
+  public void deleteFiles(Iterable<String> pathsToDelete) throws 
BulkDeletionFailureException {
+    Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
+        .forEachRemaining(
+            partitioned -> {
+              Map<FileIO, List<String>> pathByFileIO =
+                  
partitioned.stream().collect(Collectors.groupingBy(this::io));
+              for (Map.Entry<FileIO, List<String>> entries : 
pathByFileIO.entrySet()) {
+                FileIO io = entries.getKey();
+                List<String> filePaths = entries.getValue();
+                if (io instanceof SupportsBulkOperations) {
+                  ((SupportsBulkOperations) io).deleteFiles(filePaths);
+                } else {
+                  LOG.warn(
+                      "IO {} does not support bulk operations. Using non-bulk 
deletes.",
+                      io.getClass().getName());
+                  Tasks.Builder<String> deleteTasks =
+                      Tasks.foreach(filePaths)
+                          .noRetry()
+                          .suppressFailureWhenFinished()
+                          .onFailure(
+                              (file, exc) -> LOG.warn("Failed to delete file: 
{}", file, exc));
+                  deleteTasks.run(io::deleteFile);
+                }
+              }
+            });
+  }
+
   @Override
   public Map<String, String> properties() {
     return properties.immutableMap();
@@ -127,7 +160,8 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
     return hadoopConf.get();
   }
 
-  private FileIO io(String location) {
+  @VisibleForTesting
+  FileIO io(String location) {
     String impl = implFromLocation(location);
     FileIO io = ioInstances.get(impl);
     if (io != null) {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java 
b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
index 9ba00d6b3e..ff1499bff7 100644
--- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
+++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
@@ -19,19 +19,28 @@
 package org.apache.iceberg.io;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
-import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 public class TestResolvingIO {
 
-  @TempDir private Path temp;
+  @TempDir private java.nio.file.Path temp;
 
   @Test
   public void testResolvingFileIOKryoSerialization() throws IOException {
@@ -91,4 +100,57 @@ public class TestResolvingIO {
     
assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class);
     
assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull();
   }
+
+  @Test
+  public void resolveFileIOBulkDeletion() throws IOException {
+    ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
+    Configuration hadoopConf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(hadoopConf);
+    Path parent = new Path(temp.toUri());
+    // configure delegation IO
+    HadoopFileIO delegation = new HadoopFileIO(hadoopConf);
+    doReturn(delegation).when(resolvingFileIO).io(anyString());
+    // write
+    List<Path> randomFilePaths =
+        IntStream.range(1, 10)
+            .mapToObj(i -> new Path(parent, "random-" + i + "-" + 
UUID.randomUUID()))
+            .collect(Collectors.toList());
+    for (Path randomFilePath : randomFilePaths) {
+      fs.createNewFile(randomFilePath);
+      
assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue();
+    }
+    // bulk deletion
+    List<String> randomFilePathString =
+        randomFilePaths.stream().map(p -> 
p.toUri().toString()).collect(Collectors.toList());
+    resolvingFileIO.deleteFiles(randomFilePathString);
+
+    for (String path : randomFilePathString) {
+      assertThat(delegation.newInputFile(path).exists()).isFalse();
+    }
+  }
+
+  @Test
+  public void resolveFileIONonBulkDeletion() {
+    ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
+    String parentPath = "inmemory://foo.db/bar";
+    // configure delegation IO
+    InMemoryFileIO delegation = new InMemoryFileIO();
+    doReturn(delegation).when(resolvingFileIO).io(anyString());
+    // write
+    byte[] someData = "some data".getBytes();
+    List<String> randomFilePaths =
+        IntStream.range(1, 10)
+            .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
+            .collect(Collectors.toList());
+    for (String randomFilePath : randomFilePaths) {
+      delegation.addFile(randomFilePath, someData);
+      assertThat(delegation.fileExists(randomFilePath)).isTrue();
+    }
+    // non-bulk deletion
+    resolvingFileIO.deleteFiles(randomFilePaths);
+
+    for (String path : randomFilePaths) {
+      assertThat(delegation.fileExists(path)).isFalse();
+    }
+  }
 }

Reply via email to