This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ac8733d7b Core: Support FileIO prefix operations (#5096)
ac8733d7b is described below
commit ac8733d7b9cf182bcee62d67cec4e1342efb6387
Author: Daniel Weeks <[email protected]>
AuthorDate: Wed Jun 22 13:40:33 2022 -0700
Core: Support FileIO prefix operations (#5096)
---
.../main/java/org/apache/iceberg/io/FileInfo.java | 44 +++++++++
.../iceberg/io/SupportsPrefixOperations.java | 51 ++++++++++
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 33 ++++++-
.../main/java/org/apache/iceberg/aws/s3/S3URI.java | 11 +++
.../org/apache/iceberg/aws/s3/TestS3FileIO.java | 42 +++++++++
.../org/apache/iceberg/hadoop/HadoopFileIO.java | 68 +++++++++++++-
.../apache/iceberg/hadoop/HadoopFileIOTest.java | 104 +++++++++++++++++++++
7 files changed, 351 insertions(+), 2 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/io/FileInfo.java
b/api/src/main/java/org/apache/iceberg/io/FileInfo.java
new file mode 100644
index 000000000..63a72c283
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/io/FileInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+public class FileInfo {
+ private final String location;
+ private final long size;
+ private final long createdAtMillis;
+
+ public FileInfo(String location, long size, long createdAtMillis) {
+ this.location = location;
+ this.size = size;
+ this.createdAtMillis = createdAtMillis;
+ }
+
+ public String location() {
+ return location;
+ }
+
+ public long size() {
+ return size;
+ }
+
+ public long createdAtMillis() {
+ return createdAtMillis;
+ }
+}
diff --git
a/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java
b/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java
new file mode 100644
index 000000000..fc65f38ab
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+/**
+ * This interface is intended as an extension for FileIO implementations
+ * to provide additional prefix based operations that may be useful in
+ * performing supporting operations.
+ */
+public interface SupportsPrefixOperations {
+
+ /**
+ * Return an iterable of all files under a prefix.
+ * <p>
+ * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+ * like the prefix must fully match a directory whereas key/value object
+ * stores may allow for arbitrary prefixes.
+ *
+ * @param prefix prefix to list
+ * @return iterable of file information
+ */
+ Iterable<FileInfo> listPrefix(String prefix);
+
+ /**
+ * Delete all files under a prefix.
+ * <p>
+ * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+ * like the prefix must fully match a directory whereas key/value object
+ * stores may allow for arbitrary prefixes.
+ *
+ * @param prefix prefix to delete
+ */
+ void deletePrefix(String prefix);
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 3f669a3b6..3cd557c37 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -33,15 +33,18 @@ import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.CredentialSupplier;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
@@ -54,6 +57,7 @@ import
software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.GetObjectTaggingResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -67,7 +71,7 @@ import software.amazon.awssdk.services.s3.model.Tagging;
* URIs with schemes s3a, s3n, https are also treated as s3 file paths.
* Using this FileIO with other schemes will result in {@link
org.apache.iceberg.exceptions.ValidationException}.
*/
-public class S3FileIO implements FileIO, SupportsBulkOperations,
CredentialSupplier {
+public class S3FileIO implements FileIO, SupportsBulkOperations,
SupportsPrefixOperations, CredentialSupplier {
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
private static final String DEFAULT_METRICS_IMPL =
"org.apache.iceberg.hadoop.HadoopMetricsContext";
private static volatile ExecutorService executorService;
@@ -241,6 +245,33 @@ public class S3FileIO implements FileIO,
SupportsBulkOperations, CredentialSuppl
return Lists.newArrayList();
}
+ @Override
+ public Iterable<FileInfo> listPrefix(String prefix) {
+ S3URI s3uri = new S3URI(prefix,
awsProperties.s3BucketToAccessPointMapping());
+ ListObjectsV2Request request =
ListObjectsV2Request.builder().bucket(s3uri.bucket()).prefix(s3uri.key()).build();
+
+ return () -> client().listObjectsV2Paginator(request).stream()
+ .flatMap(r -> r.contents().stream())
+ .map(o -> new FileInfo(
+ String.format("%s://%s/%s", s3uri.scheme(), s3uri.bucket(),
o.key()),
+ o.size(), o.lastModified().toEpochMilli())).iterator();
+ }
+
+ /**
+ * This method provides a "best-effort" to delete all objects under the
+ * given prefix.
+ *
+ * Bulk delete operations are used and no reattempt is made for deletes if
+ * they fail, but will log any individual objects that are not deleted as
part
+ * of the bulk operation.
+ *
+ * @param prefix prefix to delete
+ */
+ @Override
+ public void deletePrefix(String prefix) {
+ deleteFiles(() ->
Streams.stream(listPrefix(prefix)).map(FileInfo::location).iterator());
+ }
+
private S3Client client() {
if (client == null) {
synchronized (this) {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
index 3398cbac9..6a798b183 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3URI.java
@@ -42,6 +42,7 @@ class S3URI {
private static final String FRAGMENT_DELIM = "#";
private final String location;
+ private final String scheme;
private final String bucket;
private final String key;
@@ -73,6 +74,7 @@ class S3URI {
this.location = location;
String [] schemeSplit = location.split(SCHEME_DELIM, -1);
ValidationException.check(schemeSplit.length == 2, "Invalid S3 URI, cannot
determine scheme: %s", location);
+ this.scheme = schemeSplit[0];
String [] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
ValidationException.check(authoritySplit.length == 2, "Invalid S3 URI,
cannot determine bucket: %s", location);
@@ -108,6 +110,15 @@ class S3URI {
return location;
}
+ /**
+ * Returns the original scheme provided in the location.
+ *
+ * @return uri scheme
+ */
+ public String scheme() {
+ return scheme;
+ }
+
@Override
public String toString() {
return location;
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index b9d7496c3..a34300aa6 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -35,13 +35,16 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
@@ -190,4 +193,43 @@ public class TestS3FileIO {
assertEquals("s3", post.get().serviceName());
}
+
+ @Test
+ public void testPrefixList() {
+ String prefix = "s3://bucket/path/to/list";
+
+ List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream().forEach(scale -> {
+ String scalePrefix = String.format("%s/%s/", prefix, scale);
+
+ createRandomObjects(scalePrefix, scale);
+ assertEquals((long) scale,
Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
+ });
+
+ long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+ Assertions.assertEquals(totalFiles,
Streams.stream(s3FileIO.listPrefix(prefix)).count());
+ }
+
+ @Test
+ public void testPrefixDelete() {
+ String prefix = "s3://bucket/path/to/delete";
+ List<Integer> scaleSizes = Lists.newArrayList(0, 5, 1000, 2500);
+
+ scaleSizes.parallelStream().forEach(scale -> {
+ String scalePrefix = String.format("%s/%s/", prefix, scale);
+
+ createRandomObjects(scalePrefix, scale);
+ s3FileIO.deletePrefix(scalePrefix);
+ assertEquals(0L,
Streams.stream(s3FileIO.listPrefix(scalePrefix)).count());
+ });
+ }
+
+ private void createRandomObjects(String prefix, int count) {
+ S3URI s3URI = new S3URI(prefix);
+
+ random.ints(count).parallel().forEach(i ->
+ s3mock.putObject(builder ->
builder.bucket(s3URI.bucket()).key(s3URI.key() + i).build(),
RequestBody.empty())
+ );
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index 1c53240db..f3c1db18e 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -20,17 +20,23 @@
package org.apache.iceberg.hadoop;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsPrefixOperations;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableSupplier;
-public class HadoopFileIO implements FileIO, HadoopConfigurable {
+public class HadoopFileIO implements FileIO, HadoopConfigurable,
SupportsPrefixOperations {
private SerializableSupplier<Configuration> hadoopConf;
@@ -89,4 +95,64 @@ public class HadoopFileIO implements FileIO,
HadoopConfigurable {
public void serializeConfWith(Function<Configuration,
SerializableSupplier<Configuration>> confSerializer) {
this.hadoopConf = confSerializer.apply(getConf());
}
+
+ @Override
+ public Iterable<FileInfo> listPrefix(String prefix) {
+ Path prefixToList = new Path(prefix);
+ FileSystem fs = Util.getFs(prefixToList, hadoopConf.get());
+
+ return () -> {
+ try {
+ return Streams.stream(new
AdaptingIterator<>(fs.listFiles(prefixToList, true /* recursive */)))
+ .map(fileStatus -> new FileInfo(fileStatus.getPath().toString(),
fileStatus.getLen(),
+ fileStatus.getModificationTime())).iterator();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ };
+ }
+
+ @Override
+ public void deletePrefix(String prefix) {
+ Path prefixToDelete = new Path(prefix);
+ FileSystem fs = Util.getFs(prefixToDelete, hadoopConf.get());
+
+ try {
+ fs.delete(prefixToDelete, true /* recursive */);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * This class is a simple adaptor to allow for using Hadoop's
+ * RemoteIterator as an Iterator.
+ *
+ * @param <E> element type
+ */
+ private static class AdaptingIterator<E> implements Iterator<E>,
RemoteIterator<E> {
+ private final RemoteIterator<E> delegate;
+
+ AdaptingIterator(RemoteIterator<E> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return delegate.hasNext();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public E next() {
+ try {
+ return delegate.next();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
new file mode 100644
index 000000000..0721d6999
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class HadoopFileIOTest {
+ private final Random random = new Random(1);
+
+ private FileSystem fs;
+ private HadoopFileIO hadoopFileIO;
+
+ @TempDir
+ static File tempDir;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Configuration conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+
+ hadoopFileIO = new HadoopFileIO(conf);
+ }
+
+ @Test
+ public void testListPrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream().forEach(scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ assertEquals((long) scale,
Streams.stream(hadoopFileIO.listPrefix(scalePath.toUri().toString())).count());
+ });
+
+ long totalFiles = scaleSizes.stream().mapToLong(Integer::longValue).sum();
+ assertEquals(totalFiles,
Streams.stream(hadoopFileIO.listPrefix(parent.toUri().toString())).count());
+ }
+
+ @Test
+ public void testDeletePrefix() {
+ Path parent = new Path(tempDir.toURI());
+
+ List<Integer> scaleSizes = Lists.newArrayList(1, 1000, 2500);
+
+ scaleSizes.parallelStream().forEach(scale -> {
+ Path scalePath = new Path(parent, Integer.toString(scale));
+
+ createRandomFiles(scalePath, scale);
+ hadoopFileIO.deletePrefix(scalePath.toUri().toString());
+
+ // Hadoop filesystem will throw if the path does not exist
+ assertThrows(UncheckedIOException.class, () ->
hadoopFileIO.listPrefix(scalePath.toUri().toString()).iterator());
+ });
+
+ hadoopFileIO.deletePrefix(parent.toUri().toString());
+ // Hadoop filesystem will throw if the path does not exist
+ assertThrows(UncheckedIOException.class, () ->
hadoopFileIO.listPrefix(parent.toUri().toString()).iterator());
+ }
+
+ private void createRandomFiles(Path parent, int count) {
+ random.ints(count).parallel().forEach(i -> {
+ try {
+ fs.createNewFile(new Path(parent, "file-" + i));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ );
+ }
+}