rdblue commented on a change in pull request #3120:
URL: https://github.com/apache/iceberg/pull/3120#discussion_r742113377



##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -28,25 +28,39 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
+/**
+ * Run iterables in parallel.
+ * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.
+ */
+@Deprecated
 public class ParallelIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
   private final Iterable<? extends Iterable<T>> iterables;
   private final ExecutorService workerPool;
+  private final int workerPoolSize;
 
+  /**
+   * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.

Review comment:
       You can omit "please" from documentation so that docs are direct and as 
short as possible.
   
   Also, can you add "will be removed in 0.14.0"? We like to keep track of when 
things can be removed.

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -72,8 +87,8 @@ private ParallelIterator(Iterable<? extends Iterable<T>> 
iterables,
             }
           }).iterator();
       this.workerPool = workerPool;
-      // submit 2 tasks per worker at a time
-      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+      // submit 2 tasks per worker at a time.

Review comment:
       Can you remove the non-functional change on this line? We don't want 
unnecessary changes to cause commit conflicts.

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -28,25 +28,39 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
+/**
+ * Run iterables in parallel.
+ * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.
+ */
+@Deprecated
 public class ParallelIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
   private final Iterable<? extends Iterable<T>> iterables;
   private final ExecutorService workerPool;
+  private final int workerPoolSize;
 
+  /**
+   * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.
+   */
+  @Deprecated
   public ParallelIterable(Iterable<? extends Iterable<T>> iterables,
                           ExecutorService workerPool) {
+    this(iterables, workerPool, Runtime.getRuntime().availableProcessors());
+  }
+
+  ParallelIterable(Iterable<? extends Iterable<T>> iterables,
+                          ExecutorService workerPool,
+                          int workerPoolSize) {

Review comment:
       Can you fix the indentation here?

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -28,25 +28,39 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
+/**
+ * Run iterables in parallel.
+ * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.
+ */
+@Deprecated

Review comment:
       This class is not deprecated. It is still public and will not be 
removed. This should be created using `CloseableIterable#combine` rather than 
directly using the constructor. Can you remove this deprecation?

##########
File path: core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
##########
@@ -128,9 +127,9 @@ public long targetSplitSize() {
   }
 
   private static CloseableIterable<ManifestFile> 
allDataManifestFiles(List<Snapshot> snapshots) {
-    try (CloseableIterable<ManifestFile> iterable = new ParallelIterable<>(
-        Iterables.transform(snapshots, snapshot -> (Iterable<ManifestFile>) () 
-> snapshot.dataManifests().iterator()),
-        ThreadPools.getWorkerPool())) {
+    try (CloseableIterable<ManifestFile> iterable = CloseableIterable.combine(
+        Iterables.transform(snapshots, snapshot -> 
CloseableIterable.withNoopClose(snapshot.dataManifests())),

Review comment:
       This does need to be fixed. `ParallelIterable` was constructed using 
`Iterable<? extends Iterable<T>>`. There should be a version of 
`CloseableIterable.combine` that accepts the same type. That may mean updating 
`CloseableIterable.concat` to accept the same `? extends Iterable<T>` in 
addition to strictly a `CloseableIterable<T>`. But that should be okay, since 
you can update to close the iterable if it is closeable.

##########
File path: core/src/main/java/org/apache/iceberg/SystemProperties.java
##########
@@ -39,11 +39,24 @@ private SystemProperties() {
    */
   public static final String SCAN_THREAD_POOL_ENABLED = 
"iceberg.scan.plan-in-worker-pool";
 
-  static boolean getBoolean(String systemProperty, boolean defaultValue) {
+  /**
+   * Whether to use the shared worker pool when planning table scans.
+   */
+  public static final String READ_DELETE_FILES_WORKER_POOL_SIZE = 
"iceberg.worker.read-deletes-num-threads";
+
+  public static boolean getBoolean(String systemProperty, boolean 
defaultValue) {

Review comment:
       As I said elsewhere, I don't think that this feature should be 
controlled through a system property. This should be a Flin-specific property 
for now and we can introduce a similar config for Spark later. But since this 
violates Spark's threading model on executors, we don't want to make this 
global.

##########
File path: core/src/main/java/org/apache/iceberg/deletes/Deletes.java
##########
@@ -77,8 +77,9 @@ private Deletes() {
     return filter.filter(rows);
   }
 
-  public static StructLikeSet toEqualitySet(CloseableIterable<StructLike> 
eqDeletes, Types.StructType eqType) {
-    try (CloseableIterable<StructLike> deletes = eqDeletes) {
+  public static <T extends StructLike> StructLikeSet 
toEqualitySet(CloseableIterable<T> eqDeletes,
+                                                                   
Types.StructType eqType) {
+    try (CloseableIterable<T> deletes = eqDeletes) {

Review comment:
       Seems reasonable.

##########
File path: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
##########
@@ -62,6 +68,13 @@
       MetadataColumns.DELETE_FILE_PATH,
       MetadataColumns.DELETE_FILE_POS);
 
+  private static final int READ_DELETES_WORKER_POOL_SIZE_DEFAULT = 0; // read 
delete files in serial.
+  private static final int READ_DELETES_WORKER_POOL_SIZE = 
SystemProperties.getInteger(
+      SystemProperties.READ_DELETE_FILES_WORKER_POOL_SIZE, 
READ_DELETES_WORKER_POOL_SIZE_DEFAULT);
+  private static final ExecutorService READ_DELETES_SERVICE = 
READ_DELETES_WORKER_POOL_SIZE <= 1 ? null :
+      MoreExecutors.getExitingExecutorService((ThreadPoolExecutor) 
Executors.newFixedThreadPool(
+          READ_DELETES_WORKER_POOL_SIZE, new 
ThreadFactoryBuilder().setNameFormat("Read-delete-Service-%d").build()));

Review comment:
       It looks like the changes other than the ones in this file are to make 
it easier to use `ParallelIterable`. Can we separate those changes from these 
for Flink and `DeleteFilter`? A separate PR for the `CloseableIterable` changes 
would make it easier to review this when there are more Flink changes to handle 
the executor service and pass it into `DeleteFilter`.

##########
File path: api/src/main/java/org/apache/iceberg/io/ParallelIterable.java
##########
@@ -28,25 +28,39 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 
+/**
+ * Run iterables in parallel.
+ * @deprecated please use {@link CloseableIterable#combine(Iterable, 
ExecutorService, int)} instead.
+ */
+@Deprecated

Review comment:
       Good catch, @jackye1995!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to