rdblue commented on a change in pull request #3120:
URL: https://github.com/apache/iceberg/pull/3120#discussion_r717122703



##########
File path: api/src/main/java/org/apache/iceberg/io/CloseableIterable.java
##########
@@ -117,6 +118,16 @@ public O next() {
     };
   }
 
+  static <E> CloseableIterable<E> combine(Iterable<CloseableIterable<E>> 
iterable,
+                                          ExecutorService workerPool,
+                                          int workerPoolSize) {
+    if (workerPool == null) {
+      return concat(iterable);
+    }
+    Preconditions.checkArgument(workerPoolSize > 0, "Invalid workerPoolSize 
(not positive): " + workerPoolSize);

Review comment:
       Style: in this project, we use empty lines to separate control flow 
blocks from the following statements.
   
   I'd also add a new line after the precondition.

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -28,25 +28,25 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
-public class ParallelIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
+class ParallelIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
   private final Iterable<? extends Iterable<T>> iterables;
   private final ExecutorService workerPool;
+  private final int workerPoolSize;
 
-  public ParallelIterable(Iterable<? extends Iterable<T>> iterables,
-                          ExecutorService workerPool) {
+  ParallelIterable(Iterable<? extends Iterable<T>> iterables,
+                          ExecutorService workerPool,
+                          int workerPoolSize) {

Review comment:
       We can't just remove the `public` modifier. Can you add `@Deprecated` 
annotations and a `@deprecated` javadoc comment to the class and constructor 
that states that you should use `CloseableIterable.combine` instead?

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -72,8 +73,8 @@ private ParallelIterator(Iterable<? extends Iterable<T>> 
iterables,
             }
           }).iterator();
       this.workerPool = workerPool;
-      // submit 2 tasks per worker at a time
-      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+      // In default, we submit 2 tasks per worker at a time.

Review comment:
       Style: please remove personal pronouns. Using "we" or "I" makes 
documentation longer and less direct. Also, no need for punctuation in line 
comments.
   
   Actually, I don't think that this comment needs to change at all. It looks 
perfectly fine without the changes.

##########
File path: core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
##########
@@ -128,9 +127,9 @@ public long targetSplitSize() {
   }
 
   private static CloseableIterable<ManifestFile> 
allDataManifestFiles(List<Snapshot> snapshots) {
-    try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
-        Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () 
-> snapshot.dataManifests().iterator()),
-        ThreadPools.getWorkerPool())) {
+    try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
+        Iterables.transform(snapshots, snapshot -> 
CloseableIterable.withNoopClose(snapshot.dataManifests())),

Review comment:
       I don't think that this should add a noop close here. Can you avoid 
adding this in all the updated calls to create a parallel iterable?

##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -77,8 +77,9 @@ private Deletes() {
     return filter.filter(rows);
   }
 
-  public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> 
eqDeletes, Types.StructType eqType) {
-    try (CloseableIterable<StructLike> deletes = eqDeletes) {
+  public static <T extends StructLike> StructLikeSet 
toEqualitySet(CloseableIterable<T> eqDeletes,
+                                                                   
Types.StructType eqType) {
+    try (CloseableIterable<T> deletes = eqDeletes) {

Review comment:
       Why was this change needed?

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -67,7 +74,11 @@
   private final Schema requiredSchema;
   private final Accessor<StructLike> posAccessor;
 
-  protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema 
requestedSchema) {
+  private int readDeletesWorkerPoolSize;
+  private ExecutorService readDeletesService;
+
+  protected DeleteFilter(FileScanTask task, Schema tableSchema, Schema 
requestedSchema,
+                         Map<String, String> tableProperties) {

Review comment:
       This class should not take table properties. I think it should take an 
executor service instead.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -108,6 +123,15 @@ protected long pos(T record) {
     return (Long) posAccessor.get(asStructLike(record));
   }
 
+  private ExecutorService readDeletesService(int workerPoolSize) {
+    return MoreExecutors.getExitingExecutorService(
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            workerPoolSize,
+            new ThreadFactoryBuilder()
+                .setNameFormat("Read-delete-Service-%d")
+                .build()));
+  }

Review comment:
       When is this executor service cleaned up?

##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -123,6 +123,9 @@ private TableProperties() {
   public static final String SPLIT_OPEN_FILE_COST = 
"read.split.open-file-cost";
   public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 
4MB
 
+  public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = 
"read.deletes.num-threads";
+  public static final int READ_DELETE_FILES_WORKER_POOL_SIZE_DEFAULT = 1;  // 
read delete files in serial.

Review comment:
       I don't think this makes much sense as a table property. Table 
properties are for table configuration, but this is an engine concern. Many 
engines handle parallelism internally so this wouldn't be appropriate. I think 
that Flink should expose a setting and manage the thread pool for using a 
delete file reader pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to