aokolnychyi commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1094075265
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -67,7 +67,11 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*
* @param deleteFunc a function that will be called to delete files
* @return this for method chaining
+ * @deprecated Deletes are now performed in bulk see {@link
#deleteBulkWith(Consumer)}. This
Review Comment:
This will break the public API if a table is configured with a non-bulk IO
and the user passes a custom delete function. In a lot of cases, a custom
delete simply adds elements to a set. After this change, the custom delete
function may be ignored and we will actually delete files.
Do we actually need to attempt bulk deletes in if a custom function is
provided? I'd consider just going through the non-bulk API in that case. That
can simplify the implementation.
```
private Consumer<String> deleteFunc = null;
...
@Override
public DeleteOrphanFilesSparkAction deleteWith(Consumer<String>
newDeleteFunc) {
this.deleteFunc = newDeleteFunc;
return this;
}
...
private void deleteFiles(Iterable<String> paths) {
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
SupportsBulkOperations bulkIO = (SupportsBulkOperations) table.io();
if (deleteExecutorService != null) {
bulkIO.deleteFiles(deleteExecutorService, paths);
} else {
bulkIO.deleteFiles(paths);
}
} else {
Tasks.foreach(paths)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}",
file, exc))
.run(deleteFunc != null ? deleteFunc::accept :
table.io()::deleteFile);
}
}
```
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,9 +84,16 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*
* @param executorService the service to use
* @return this for method chaining
+ * @deprecated All deletes should be performed using the bulk delete api if
available. Use FileIO
+ * specific parallelism controls to adjust bulk delete concurrency
within that api.
*/
+ @Deprecated
DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
+ default DeleteOrphanFiles deleteBulkWith(Consumer<Iterable<String>>
deleteFunc) {
Review Comment:
Do we have a use case for this? If not, I'd skip it and just support the old
`deleteWith` method.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
return summary;
}
+ protected DeleteSummary deleteFiles(
+ Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+ DeleteSummary summary = new DeleteSummary();
+ Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files,
DELETE_GROUP_SIZE);
+
+ Tasks.foreach(groupedIterator)
+ .suppressFailureWhenFinished()
+ .run(
+ fileList -> {
+ Map<String, List<FileInfo>> filesByType =
+
fileList.stream().collect(Collectors.groupingBy(FileInfo::getType));
+ filesByType.entrySet().stream()
+ .forEach(
+ entry -> {
+ List<String> pathsToDelete =
Review Comment:
What about using `ListMultimap`?
```
ListMultimap<String, String> pathsByType =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (FileInfo fileInfo : fileGroup) {
pathsByType.put(fileInfo.getType(), fileInfo.getPath());
}
for (Map.Entry<String, Collection<String>> entry :
filesByType.asMap().entrySet()) {
String type = entry.getKey();
Collection<String> pathsToDelete = entry.getValue();
...
}
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkAction.class);
private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
private static final int DELETE_NUM_RETRIES = 3;
+ private static final int DELETE_GROUP_SIZE = 100000;
Review Comment:
I don't think we would want to make it configurable but can 100K be too
many? This will be used in the path that collects chunk by chunk to the driver
to avoid OOM, which is supposed to help small drivers or scenarios with lots of
files to delete. Do we think the records are tiny so 100K won't cause any
issues?
I think S3FileIO would send 250 files in a single bulk delete by default.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -261,6 +298,24 @@ static class DeleteSummary {
private final AtomicLong manifestListsCount = new AtomicLong(0L);
private final AtomicLong otherFilesCount = new AtomicLong(0L);
+ public void deletedFiles(String type, int numFiles) {
+ if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+ dataFilesCount.addAndGet(numFiles);
Review Comment:
nit: What about empty lines after each if clause like in the method below?
We do that quite frequently in large if-else statements.
##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -18,27 +18,42 @@
*/
package org.apache.iceberg.hadoop;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class HadoopFileIO implements FileIO, HadoopConfigurable,
SupportsPrefixOperations {
+public class HadoopFileIO
+ implements FileIO, HadoopConfigurable, SupportsPrefixOperations,
SupportsBulkOperations {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopFileIO.class);
+ private static final String DELETE_FILE_PARALLELISM =
"iceberg.hadoop.delete_file_parallelism";
Review Comment:
Do we have any other Hadoop configs? Which naming style did we use there? I
thought we were using `-` to separate words.
##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,9 +84,16 @@ public interface DeleteOrphanFiles extends
Action<DeleteOrphanFiles, DeleteOrpha
*
* @param executorService the service to use
* @return this for method chaining
+ * @deprecated All deletes should be performed using the bulk delete api if
available. Use FileIO
+ * specific parallelism controls to adjust bulk delete concurrency
within that api.
*/
+ @Deprecated
Review Comment:
I am not sure the deprecation is worth it. Instead of deprecating a bunch of
widely used methods in actions and arguments in procedures, can we consider
supporting external executor services with bulk deletes so that these arguments
can be still supported?
I like the idea of bulk deletes in `HadoopFileIO`, but I don't see anything
fundamentally bad in an ability to override the delete parallelism in actions
or procedures. Especially, because it can make the implementation simpler. We
already discussed passing executor services in the original PR that added the
bulk API.
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
return summary;
}
+ protected DeleteSummary deleteFiles(
+ Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+ DeleteSummary summary = new DeleteSummary();
+ Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files,
DELETE_GROUP_SIZE);
+
+ Tasks.foreach(groupedIterator)
+ .suppressFailureWhenFinished()
+ .run(
+ fileList -> {
+ Map<String, List<FileInfo>> filesByType =
Review Comment:
What about adding a private method to handle this? I feel it would help with
formatting (if the call fits on a single line). Spotless formats these closures
in a weird way.
```
Iterator<List<FileInfo>> fileGroups = Iterators.partition(files,
DELETE_GROUP_SIZE);
Tasks.foreach(fileGroups)
.noRetry()
.suppressFailureWhenFinished()
.run(fileGroup -> deleteFileGroup(...));
LOG.info("Deleted {} total files with bulk deletes",
summary.totalFilesCount());
return summary;
```
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
return summary;
}
+ protected DeleteSummary deleteFiles(
+ Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+ DeleteSummary summary = new DeleteSummary();
+ Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files,
DELETE_GROUP_SIZE);
Review Comment:
nit: Is there a more descriptive name? fileGroups, fileLists?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]