This is an automated email from the ASF dual-hosted git repository.
findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c7910bb401 Core: Fix possible deadlock in ParallelIterable (#11781)
c7910bb401 is described below
commit c7910bb401f7f7fd09010bede0d80f5d2164afd5
Author: Karol Sobczak <[email protected]>
AuthorDate: Sat Jan 11 17:13:18 2025 +0100
Core: Fix possible deadlock in ParallelIterable (#11781)
* Fix ParallelIterable deadlock
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3
pool.
Specifically, ManifestGroup#plan will create ManifestReader per every
ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.
Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1
ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put
on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is
blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is
waiting for TaskP1 to release connection
The fix make sure Task is finished once it's started. This way limited
resources like
connection pool are not put on hold. Queue size might exceed strict limits,
but it should
still be bounded.
Fixes https://github.com/apache/iceberg/issues/11768
* Do not submit a task when there is no space in queue
---
.../org/apache/iceberg/util/ParallelIterable.java | 27 ++++++--
.../apache/iceberg/util/TestParallelIterable.java | 80 ++++++++++++++--------
2 files changed, 71 insertions(+), 36 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index d40f648447..7acab8762f 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -86,6 +86,7 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new
ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final int maxQueueSize;
private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool,
int maxQueueSize) {
@@ -97,6 +98,7 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new CompletableFuture[2 *
ThreadPools.WORKER_THREAD_POOL_SIZE];
+ this.maxQueueSize = maxQueueSize;
}
@Override
@@ -153,6 +155,7 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
try {
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
+ taskFutures[i] = null;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
@@ -165,7 +168,10 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
}
}
- taskFutures[i] = submitNextTask();
+ // submit a new task if there is space in the queue
+ if (queue.size() < maxQueueSize) {
+ taskFutures[i] = submitNextTask();
+ }
}
if (taskFutures[i] != null) {
@@ -257,17 +263,24 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
@Override
public Optional<Task<T>> get() {
try {
+ if (queue.size() >= approximateMaxQueueSize) {
+ // Yield when queue is over the size limit. Task will be resubmitted
later and continue
+ // the work.
+ //
+ // Tasks might hold references (via iterator) to constrained
resources
+ // (e.g. pooled connections). Hence, tasks should yield only when
+ // iterator is not instantiated. Otherwise, there could be
+ // a deadlock when yielded tasks are waiting to be executed while
+ // currently executed tasks are waiting for the resources that are
held
+ // by the yielded tasks.
+ return Optional.of(this);
+ }
+
if (iterator == null) {
iterator = input.iterator();
}
while (iterator.hasNext()) {
- if (queue.size() >= approximateMaxQueueSize) {
- // Yield when queue is over the size limit. Task will be
resubmitted later and continue
- // the work.
- return Optional.of(this);
- }
-
T next = iterator.next();
if (closed.get()) {
break;
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 410e33058d..a1e14a22a7 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -39,6 +40,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Multiset;
import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
public class TestParallelIterable {
@Test
@@ -143,7 +145,7 @@ public class TestParallelIterable {
@Test
public void limitQueueSize() {
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
try {
List<Iterable<Integer>> iterables =
ImmutableList.of(
@@ -167,7 +169,7 @@ public class TestParallelIterable {
while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
- .isLessThanOrEqualTo(maxQueueSize + iterables.size());
+ .isLessThanOrEqualTo(100);
actualValues.add(iterator.next());
}
@@ -182,41 +184,61 @@ public class TestParallelIterable {
}
@Test
- public void queueSizeOne() {
- ExecutorService executor = Executors.newCachedThreadPool();
+ @Timeout(10)
+ public void noDeadlock() {
+ // This test simulates a scenario where iterators use a constrained
resource
+ // (e.g. an S3 connection pool that has a limit on the number of
connections).
+ // In this case, the constrained resource shouldn't cause a deadlock when
queue
+ // is full and the iterator is waiting for the queue to be drained.
+ ExecutorService executor = Executors.newFixedThreadPool(1);
try {
- List<Iterable<Integer>> iterables =
- ImmutableList.of(
- () -> IntStream.range(0, 100).iterator(),
- () -> IntStream.range(0, 100).iterator(),
- () -> IntStream.range(0, 100).iterator());
+ Semaphore semaphore = new Semaphore(1);
- Multiset<Integer> expectedValues =
- IntStream.range(0, 100)
- .boxed()
- .flatMap(i -> Stream.of(i, i, i))
- .collect(ImmutableMultiset.toImmutableMultiset());
+ List<Iterable<Integer>> iterablesA =
+ ImmutableList.of(
+ testIterable(
+ semaphore::acquire, semaphore::release, IntStream.range(0,
100).iterator()));
+ List<Iterable<Integer>> iterablesB =
+ ImmutableList.of(
+ testIterable(
+ semaphore::acquire, semaphore::release, IntStream.range(200,
300).iterator()));
- ParallelIterable<Integer> parallelIterable = new
ParallelIterable<>(iterables, executor, 1);
- ParallelIterator<Integer> iterator = (ParallelIterator<Integer>)
parallelIterable.iterator();
+ ParallelIterable<Integer> parallelIterableA = new
ParallelIterable<>(iterablesA, executor, 1);
+ ParallelIterable<Integer> parallelIterableB = new
ParallelIterable<>(iterablesB, executor, 1);
- Multiset<Integer> actualValues = HashMultiset.create();
+ parallelIterableA.iterator().next();
+ parallelIterableB.iterator().next();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
- while (iterator.hasNext()) {
- assertThat(iterator.queueSize())
- .as("iterator internal queue size")
- .isLessThanOrEqualTo(1 + iterables.size());
- actualValues.add(iterator.next());
+ private <T> CloseableIterable<T> testIterable(
+ RunnableWithException open, RunnableWithException close, Iterator<T>
iterator) {
+ return new CloseableIterable<T>() {
+ @Override
+ public void close() {
+ try {
+ close.run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- assertThat(actualValues)
- .as("multiset of values returned by the iterator")
- .isEqualTo(expectedValues);
+ @Override
+ public CloseableIterator<T> iterator() {
+ try {
+ open.run();
+ return CloseableIterator.withClose(iterator);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
- iterator.close();
- } finally {
- executor.shutdown();
- }
+ private interface RunnableWithException {
+ void run() throws Exception;
}
private void queueHasElements(ParallelIterator<Integer> iterator) {