This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0b5aacd942 Core: Use ParallelIterable in Deletes::toPositionIndex
(6387) (#8805)
0b5aacd942 is described below
commit 0b5aacd94207f9d880b9ddd38d04c13f59075c10
Author: Wing Yew Poon <[email protected]>
AuthorDate: Thu Nov 2 00:50:42 2023 -0700
Core: Use ParallelIterable in Deletes::toPositionIndex (6387) (#8805)
Co-authored-by: Rajesh Balamohan <[email protected]>
---
.../java/org/apache/iceberg/SystemConfigs.java | 11 ++++++++++
.../iceberg/deletes/BitmapPositionDeleteIndex.java | 2 +-
.../java/org/apache/iceberg/deletes/Deletes.java | 16 ++++++++++++++-
.../java/org/apache/iceberg/util/ThreadPools.java | 21 +++++++++++++++++++
.../apache/iceberg/deletes/TestPositionFilter.java | 24 ++++++++++++++++++----
5 files changed, 68 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
index 74dff295d7..ce183ec5d4 100644
--- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -42,6 +42,17 @@ public class SystemConfigs {
Math.max(2, Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);
+ /**
+ * Sets the size of the delete worker pool. This limits the number of
threads used to compute the
+ * PositionDeleteIndex from the position deletes for a data file.
+ */
+ public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
+ new ConfigEntry<>(
+ "iceberg.worker.delete-num-threads",
+ "ICEBERG_WORKER_DELETE_NUM_THREADS",
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Integer::parseUnsignedInt);
+
/** Whether to use the shared worker pool when planning table scans. */
public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
new ConfigEntry<>(
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 7690ab7e48..1aa6571a03 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -28,7 +28,7 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
}
@Override
- public void delete(long position) {
+ public synchronized void delete(long position) {
roaring64Bitmap.add(position);
}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 2dee8b8f58..118987e24b 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.deletes;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -37,8 +38,10 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Filter;
+import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.SortedMerge;
import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,6 +130,13 @@ public class Deletes {
public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
+ return toPositionIndex(dataLocation, deleteFiles,
ThreadPools.getDeleteWorkerPool());
+ }
+
+ public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
+ CharSequence dataLocation,
+ List<CloseableIterable<T>> deleteFiles,
+ ExecutorService deleteWorkerPool) {
DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
List<CloseableIterable<Long>> positions =
Lists.transform(
@@ -134,7 +144,11 @@ public class Deletes {
deletes ->
CloseableIterable.transform(
locationFilter.filter(deletes), row -> (Long)
POSITION_ACCESSOR.get(row)));
- return toPositionIndex(CloseableIterable.concat(positions));
+ if (positions.size() > 1 && deleteWorkerPool != null) {
+ return toPositionIndex(new ParallelIterable<>(positions,
deleteWorkerPool));
+ } else {
+ return toPositionIndex(CloseableIterable.concat(positions));
+ }
}
public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long>
posDeletes) {
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index a998e4262f..4f2314fec9 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -44,6 +44,12 @@ public class ThreadPools {
private static final ExecutorService WORKER_POOL =
newWorkerPool("iceberg-worker-pool");
+ public static final int DELETE_WORKER_THREAD_POOL_SIZE =
+ SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();
+
+ private static final ExecutorService DELETE_WORKER_POOL =
+ newWorkerPool("iceberg-delete-worker-pool",
DELETE_WORKER_THREAD_POOL_SIZE);
+
/**
* Return an {@link ExecutorService} that uses the "worker" thread-pool.
*
@@ -59,6 +65,21 @@ public class ThreadPools {
return WORKER_POOL;
}
+ /**
+ * Return an {@link ExecutorService} that uses the "delete worker"
thread-pool.
+ *
+ * <p>The size of the delete worker pool limits the number of threads used
to compute the
+ * PositionDeleteIndex from the position deletes for a data file.
+ *
+ * <p>The size of this thread-pool is controlled by the Java system property
{@code
+ * iceberg.worker.delete-num-threads}.
+ *
+ * @return an {@link ExecutorService} that uses the delete worker pool
+ */
+ public static ExecutorService getDeleteWorkerPool() {
+ return DELETE_WORKER_POOL;
+ }
+
public static ExecutorService newWorkerPool(String namePrefix) {
return newWorkerPool(namePrefix, WORKER_THREAD_POOL_SIZE);
}
diff --git
a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
index 16a5695104..8e35c8c9fc 100644
--- a/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
+++ b/core/src/test/java/org/apache/iceberg/deletes/TestPositionFilter.java
@@ -23,8 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import java.util.stream.Stream;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestHelpers.Row;
@@ -33,7 +37,10 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
public class TestPositionFilter {
@Test
@@ -280,8 +287,16 @@ public class TestPositionFilter {
.containsExactlyElementsOf(Lists.newArrayList(1L, 2L, 5L, 6L, 8L));
}
- @Test
- public void testCombinedPositionSetRowFilter() {
+ static Stream<ExecutorService> executorServiceProvider() {
+ return Stream.of(
+ null,
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(4)));
+ }
+
+ @ParameterizedTest
+ @MethodSource("executorServiceProvider")
+ public void testCombinedPositionSetRowFilter(ExecutorService
executorService) {
CloseableIterable<StructLike> positionDeletes1 =
CloseableIterable.withNoopClose(
Lists.newArrayList(
@@ -316,9 +331,10 @@ public class TestPositionFilter {
Predicate<StructLike> isDeleted =
row ->
Deletes.toPositionIndex(
- "file_a.avro", ImmutableList.of(positionDeletes1,
positionDeletes2))
+ "file_a.avro",
+ ImmutableList.of(positionDeletes1, positionDeletes2),
+ executorService)
.isDeleted(row.get(0, Long.class));
-
CloseableIterable<StructLike> actual = CloseableIterable.filter(rows,
isDeleted.negate());
assertThat(Iterables.transform(actual, row -> row.get(0, Long.class)))