nastra commented on code in PR #5096:
URL: https://github.com/apache/iceberg/pull/5096#discussion_r902558622


##########
api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.Iterator;
+
+/**
+ * This interface is intended as an extension for FileIO implementations
+ * to provide additional prefix based operations that may be useful in
+ * performing supporting operations.
+ */
+public interface SupportsPrefixOperations {
+
+  /**
+   * Return a stream of all files under a prefix.

Review Comment:
   nit: stream -> iterator



##########
aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java:
##########
@@ -241,6 +246,54 @@ private List<String> deleteObjectsInBucket(String bucket, 
Collection<String> obj
     return Lists.newArrayList();
   }
 
+  @Override
+  public Iterator<FileInfo> listPrefix(String prefix) {
+    S3URI s3uri = new S3URI(prefix, 
awsProperties.s3BucketToAccessPointMapping());
+
+    return internalListPrefix(s3uri.bucket(), s3uri.key()).stream()
+        .flatMap(r -> r.contents().stream())
+        .map(o -> new FileInfo(
+            String.format("%s://%s/%s", s3uri.scheme(), s3uri.bucket(), 
o.key()),
+            o.size(), o.lastModified().toEpochMilli())).iterator();
+  }
+
+  /**
+   * This method provides a "best-effort" to delete all objects under the
+   * given prefix.
+   *
+   * Bulk delete operations are used and no reattempt is made for deletes if
+   * they fail, but will log any individual objects that are not deleted as 
part
+   * of the bulk operation.
+   *
+   * @param prefix prefix to delete
+   */
+  @Override
+  public void deletePrefix(String prefix) {
+    S3URI s3uri = new S3URI(prefix, 
awsProperties.s3BucketToAccessPointMapping());
+
+    internalListPrefix(s3uri.bucket(), 
s3uri.key()).stream().parallel().forEach(listing -> {
+      List<ObjectIdentifier> objectIdentifiers = listing.contents().stream()
+          .map(o -> ObjectIdentifier.builder().key(o.key()).build())
+          .collect(Collectors.toList());
+
+      DeleteObjectsRequest request = DeleteObjectsRequest.builder()
+          .bucket(s3uri.bucket())
+          .delete(Delete.builder().objects(objectIdentifiers).build())
+          .build();
+
+      client().deleteObjects(request).errors().forEach((s3Error -> {
+        LOG.warn("Error occurred during delete operation. {}: {}. {}", 
s3Error.code(), s3Error.message(),
+            s3Error.key());
+      }));
+    });
+  }
+
+  private ListObjectsV2Iterable internalListPrefix(String bucket, String 
keyPrefix) {
+    ListObjectsV2Request request = 
ListObjectsV2Request.builder().bucket(bucket).prefix(keyPrefix).build();

Review Comment:
   nit: maybe `request` could be inlined?



##########
api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.Iterator;
+
+/**
+ * This interface is intended as an extension for FileIO implementations
+ * to provide additional prefix based operations that may be useful in
+ * performing supporting operations.
+ */
+public interface SupportsPrefixOperations {
+
+  /**
+   * Return a stream of all files under a prefix.
+   * <p>
+   * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+   * like the prefix mush fully match a directory whereas key/value object

Review Comment:
   nit: mush -> must



##########
api/src/main/java/org/apache/iceberg/io/SupportsPrefixOperations.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.Iterator;
+
+/**
+ * This interface is intended as an extension for FileIO implementations
+ * to provide additional prefix based operations that may be useful in
+ * performing supporting operations.
+ */
+public interface SupportsPrefixOperations {
+
+  /**
+   * Return a stream of all files under a prefix.
+   * <p>
+   * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+   * like the prefix mush fully match a directory whereas key/value object
+   * stores may allow for arbitrary prefixes.
+   *
+   * @param prefix prefix to list
+   * @return iterator of file information
+   */
+  Iterator<FileInfo> listPrefix(String prefix);
+
+  /**
+   * Delete all files under a prefix.
+   * <p>
+   * Hierarchical file systems (e.g. HDFS) may impose additional restrictions
+   * like the prefix mush fully match a directory whereas key/value object

Review Comment:
   nit: same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to