This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b5aacd942 Core: Use ParallelIterable in Deletes::toPositionIndex 
(6387) (#8805)
0b5aacd942 is described below

commit 0b5aacd94207f9d880b9ddd38d04c13f59075c10
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Nov 2 00:50:42 2023 -0700

    Core: Use ParallelIterable in Deletes::toPositionIndex (6387) (#8805)
    
    Co-authored-by: Rajesh Balamohan <[email protected]>
---
 .../java/org/apache/iceberg/SystemConfigs.java     | 11 ++++++++++
 .../iceberg/deletes/BitmapPositionDeleteIndex.java |  2 +-
 .../java/org/apache/iceberg/deletes/Deletes.java   | 16 ++++++++++++++-
 .../java/org/apache/iceberg/util/ThreadPools.java  | 21 +++++++++++++++++++
 .../apache/iceberg/deletes/TestPositionFilter.java | 24 ++++++++++++++++++----
 5 files changed, 68 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java 
b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
index 74dff295d7..ce183ec5d4 100644
--- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -42,6 +42,17 @@ public class SystemConfigs {
           Math.max(2, Runtime.getRuntime().availableProcessors()),
           Integer::parseUnsignedInt);
 
+  /**
+   * Sets the size of the delete worker pool. This limits the number of 
threads used to compute the
+   * PositionDeleteIndex from the position deletes for a data file.
+   */
+  public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
+      new ConfigEntry<>(
+          "iceberg.worker.delete-num-threads",
+          "ICEBERG_WORKER_DELETE_NUM_THREADS",
+          Math.max(2, Runtime.getRuntime().availableProcessors()),
+          Integer::parseUnsignedInt);
+
   /** Whether to use the shared worker pool when planning table scans. */
   public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
       new ConfigEntry<>(
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 7690ab7e48..1aa6571a03 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -28,7 +28,7 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
   }
 
   @Override
-  public void delete(long position) {
+  public synchronized void delete(long position) {
     roaring64Bitmap.add(position);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java 
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 2dee8b8f58..118987e24b 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.deletes;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -37,8 +38,10 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Filter;
+import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.SortedMerge;
 import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.ThreadPools;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,6 +130,13 @@ public class Deletes {
 
   public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
       CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
+    return toPositionIndex(dataLocation, deleteFiles, 
ThreadPools.getDeleteWorkerPool());
+  }
+
+  public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
+      CharSequence dataLocation,
+      List<CloseableIterable<T>> deleteFiles,
+      ExecutorService deleteWorkerPool) {
     DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
     List<CloseableIterable<Long>> positions =
         Lists.transform(
@@ -134,7 +144,11 @@ public class Deletes {
             deletes ->
                 CloseableIterable.transform(
                     locationFilter.filter(deletes), row -> (Long) 
POSITION_ACCESSOR.get(row)));
-    return toPositionIndex(CloseableIterable.concat(positions));
+    if (positions.size() > 1 && deleteWorkerPool != null) {
+      return toPositionIndex(new ParallelIterable<>(positions, 
deleteWorkerPool));
+    } else {
+      return toPositionIndex(CloseableIterable.concat(positions));
+    }
   }
 
   public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> 
posDeletes) {
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java 
b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index a998e4262f..4f2314fec9 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -44,6 +44,12 @@ public class ThreadPools {
 
   private static final ExecutorService WORKER_POOL = 
newWorkerPool("iceberg-worker-pool");
 
+  public static final int DELETE_WORKER_THREAD_POOL_SIZE =
+      SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+  private static final ExecutorService DELETE_WORKER_POOL =
+      newWorkerPool("iceberg-delete-worker-pool", 
DELETE_WORKER_THREAD_POOL_SIZE);
+
   /**
    * Return an {@link ExecutorService} that uses the "worker" thread-pool.
    *
@@ -59,6 +65,21 @@ public class ThreadPools {
     return WORKER_POOL;
   }
 
+  /**
+   * Return an {@link ExecutorService} that uses the "delete worker" 
thread-pool.
+   *
+   * <p>The size of the delete worker pool limits the number of threads used 
to compute the
+   * PositionDeleteIndex from the position deletes for a data file.
+   *
+   * <p>The size of this thread-pool is controlled by the Java system property 
{@code
+   * iceberg.worker.delete-num-threads}.
+   *
+   * @return an {@link ExecutorService} that uses the delete worker pool
+   */
+  public static ExecutorService getDeleteWorkerPool() {
+    return DELETE_WORKER_POOL;
+  }
+
   public static ExecutorService newWorkerPool(String namePrefix) {
     return newWorkerPool(namePrefix, WORKER_THREAD_POOL_SIZE);
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java 
b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
index 16a5695104..8e35c8c9fc 100644
--- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
+++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
@@ -23,8 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TestHelpers.Row;
@@ -33,7 +37,10 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 public class TestPositionFilter {
   @Test
@@ -280,8 +287,16 @@ public class TestPositionFilter {
         .containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L));
   }
 
-  @Test
-  public void testCombinedPositionSetRowFilter() {
+  static Stream<ExecutorService> executorServiceProvider() {
+    return Stream.of(
+        null,
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) Executors.newFixedThreadPool(4)));
+  }
+
+  @ParameterizedTest
+  @MethodSource("executorServiceProvider")
+  public void testCombinedPositionSetRowFilter(ExecutorService 
executorService) {
     CloseableIterable<StructLike> positionDeletes1 =
         CloseableIterable.withNoopClose(
             Lists.newArrayList(
@@ -316,9 +331,10 @@ public class TestPositionFilter {
     Predicate<StructLike> isDeleted =
         row ->
             Deletes.toPositionIndex(
-                    "file_a.avro", ImmutableList.of(positionDeletes1, 
positionDeletes2))
+                    "file_a.avro",
+                    ImmutableList.of(positionDeletes1, positionDeletes2),
+                    executorService)
                 .isDeleted(row.get(0, Long.class));
-
     CloseableIterable<StructLike> actual = CloseableIterable.filter(rows, 
isDeleted.negate());
 
     assertThat(Iterables.transform(actual, row -> row.get(0, Long.class)))

Reply via email to