This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f4d830b0fa Core: Extend ResolvingFileIO to support BulkOperations
(#7976)
f4d830b0fa is described below
commit f4d830b0faba6e602932cf81ec03206b19e371bd
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Aug 14 00:09:41 2023 -0700
Core: Extend ResolvingFileIO to support BulkOperations (#7976)
---
.../org/apache/iceberg/io/ResolvingFileIO.java | 38 ++++++++++++-
.../org/apache/iceberg/io/TestResolvingIO.java | 66 +++++++++++++++++++++-
2 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 22f17ec4db..58c0586b7e 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -23,23 +23,28 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** FileIO implementation that uses location scheme to choose the correct
FileIO implementation. */
-public class ResolvingFileIO implements FileIO, HadoopConfigurable {
+public class ResolvingFileIO implements FileIO, HadoopConfigurable,
SupportsBulkOperations {
private static final Logger LOG =
LoggerFactory.getLogger(ResolvingFileIO.class);
+ private static final int BATCH_SIZE = 100_000;
private static final String FALLBACK_IMPL =
"org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL =
"org.apache.iceberg.aws.s3.S3FileIO";
private static final String GCS_FILE_IO_IMPL =
"org.apache.iceberg.gcp.gcs.GCSFileIO";
@@ -85,6 +90,34 @@ public class ResolvingFileIO implements FileIO,
HadoopConfigurable {
io(location).deleteFile(location);
}
+ @Override
+ public void deleteFiles(Iterable<String> pathsToDelete) throws
BulkDeletionFailureException {
+ Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
+ .forEachRemaining(
+ partitioned -> {
+ Map<FileIO, List<String>> pathByFileIO =
+
partitioned.stream().collect(Collectors.groupingBy(this::io));
+ for (Map.Entry<FileIO, List<String>> entries :
pathByFileIO.entrySet()) {
+ FileIO io = entries.getKey();
+ List<String> filePaths = entries.getValue();
+ if (io instanceof SupportsBulkOperations) {
+ ((SupportsBulkOperations) io).deleteFiles(filePaths);
+ } else {
+ LOG.warn(
+ "IO {} does not support bulk operations. Using non-bulk
deletes.",
+ io.getClass().getName());
+ Tasks.Builder<String> deleteTasks =
+ Tasks.foreach(filePaths)
+ .noRetry()
+ .suppressFailureWhenFinished()
+ .onFailure(
+ (file, exc) -> LOG.warn("Failed to delete file:
{}", file, exc));
+ deleteTasks.run(io::deleteFile);
+ }
+ }
+ });
+ }
+
@Override
public Map<String, String> properties() {
return properties.immutableMap();
@@ -127,7 +160,8 @@ public class ResolvingFileIO implements FileIO,
HadoopConfigurable {
return hadoopConf.get();
}
- private FileIO io(String location) {
+ @VisibleForTesting
+ FileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
if (io != null) {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
index 9ba00d6b3e..ff1499bff7 100644
--- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
+++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
@@ -19,19 +19,28 @@
package org.apache.iceberg.io;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
-import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class TestResolvingIO {
- @TempDir private Path temp;
+ @TempDir private java.nio.file.Path temp;
@Test
public void testResolvingFileIOKryoSerialization() throws IOException {
@@ -91,4 +100,57 @@ public class TestResolvingIO {
assertThat(roundTripSerializedFileIO.ioClass(temp.toString())).isEqualTo(HadoopFileIO.class);
assertThat(roundTripSerializedFileIO.newInputFile(temp.toString())).isNotNull();
}
+
+ @Test
+ public void resolveFileIOBulkDeletion() throws IOException {
+ ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
+ Configuration hadoopConf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(hadoopConf);
+ Path parent = new Path(temp.toUri());
+ // configure delegation IO
+ HadoopFileIO delegation = new HadoopFileIO(hadoopConf);
+ doReturn(delegation).when(resolvingFileIO).io(anyString());
+ // write
+ List<Path> randomFilePaths =
+ IntStream.range(1, 10)
+ .mapToObj(i -> new Path(parent, "random-" + i + "-" +
UUID.randomUUID()))
+ .collect(Collectors.toList());
+ for (Path randomFilePath : randomFilePaths) {
+ fs.createNewFile(randomFilePath);
+
assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue();
+ }
+ // bulk deletion
+ List<String> randomFilePathString =
+ randomFilePaths.stream().map(p ->
p.toUri().toString()).collect(Collectors.toList());
+ resolvingFileIO.deleteFiles(randomFilePathString);
+
+ for (String path : randomFilePathString) {
+ assertThat(delegation.newInputFile(path).exists()).isFalse();
+ }
+ }
+
+ @Test
+ public void resolveFileIONonBulkDeletion() {
+ ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
+ String parentPath = "inmemory://foo.db/bar";
+ // configure delegation IO
+ InMemoryFileIO delegation = new InMemoryFileIO();
+ doReturn(delegation).when(resolvingFileIO).io(anyString());
+ // write
+ byte[] someData = "some data".getBytes();
+ List<String> randomFilePaths =
+ IntStream.range(1, 10)
+ .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
+ .collect(Collectors.toList());
+ for (String randomFilePath : randomFilePaths) {
+ delegation.addFile(randomFilePath, someData);
+ assertThat(delegation.fileExists(randomFilePath)).isTrue();
+ }
+ // non-bulk deletion
+ resolvingFileIO.deleteFiles(randomFilePaths);
+
+ for (String path : randomFilePaths) {
+ assertThat(delegation.fileExists(path)).isFalse();
+ }
+ }
}