This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 5fbf369593dae04584439179388f3d4ade90e03b Author: Karol Sobczak <[email protected]> AuthorDate: Sat Jan 11 17:13:18 2025 +0100 Core: Fix possible deadlock in ParallelIterable (#11781) * Fix ParallelIterable deadlock It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool. Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. Consider scenario: S3 connection pool size=1 approximateMaxQueueSize=1 workerPoolSize=1 ParallelIterable1: starts TaskP1 ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection) ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool ParallelIterable1: result gets consumed, TaskP1 is scheduled again ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded. Fixes https://github.com/apache/iceberg/issues/11768 * Do not submit a task when there is no space in queue --- .../org/apache/iceberg/util/ParallelIterable.java | 27 +- .../apache/iceberg/util/TestParallelIterable.java | 316 ++++++++++++--------- 2 files changed, 196 insertions(+), 147 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d40f648447..7acab8762f 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -86,6 +86,7 @@ public class ParallelIterable<T> extends CloseableGroup implements CloseableIter private final CompletableFuture<Optional<Task<T>>>[] taskFutures; private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private final int maxQueueSize; private ParallelIterator( Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) { @@ -97,6 +98,7 @@ public class ParallelIterable<T> extends CloseableGroup implements CloseableIter this.workerPool = workerPool; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.maxQueueSize = maxQueueSize; } @Override @@ -153,6 +155,7 @@ public class ParallelIterable<T> extends CloseableGroup implements CloseableIter try { Optional<Task<T>> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -165,7 +168,10 @@ public class ParallelIterable<T> extends CloseableGroup implements CloseableIter } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); + } } if (taskFutures[i] != null) { @@ -257,17 +263,24 @@ public class ParallelIterable<T> extends CloseableGroup implements CloseableIter @Override public Optional<Task<T>> get() { try { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + // + // Tasks might hold references (via iterator) to constrained resources + // (e.g. pooled connections). Hence, tasks should yield only when + // iterator is not instantiated. Otherwise, there could be + // a deadlock when yielded tasks are waiting to be executed while + // currently executed tasks are waiting for the resources that are held + // by the yielded tasks. + return Optional.of(this); + } + if (iterator == null) { iterator = input.iterator(); } while (iterator.hasNext()) { - if (queue.size() >= approximateMaxQueueSize) { - // Yield when queue is over the size limit. Task will be resubmitted later and continue - // the work. - return Optional.of(this); - } - T next = iterator.next(); if (closed.get()) { break; diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 5e37e0390d..a1e14a22a7 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -39,170 +40,205 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multiset; import org.apache.iceberg.util.ParallelIterable.ParallelIterator; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; public class TestParallelIterable { @Test public void closeParallelIteratorWithoutCompleteIteration() { ExecutorService executor = Executors.newFixedThreadPool(1); - - Iterable<CloseableIterable<Integer>> transform = - Iterables.transform( - Lists.newArrayList(1, 2, 3, 4, 5), - item -> - new CloseableIterable<Integer>() { - @Override - public void close() {} - - @Override - public CloseableIterator<Integer> iterator() { - return CloseableIterator.withClose(Collections.singletonList(item).iterator()); - } - }); - - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); - - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); - Awaitility.await("Queue is populated") - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> queueHasElements(iterator)); - iterator.close(); - Awaitility.await("Queue is cleared") - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0)); + try { + Iterable<CloseableIterable<Integer>> transform = + Iterables.transform( + Lists.newArrayList(1, 2, 3, 4, 5), + item -> + new CloseableIterable<Integer>() { + @Override + public void close() {} + + @Override + public CloseableIterator<Integer> iterator() { + return CloseableIterator.withClose( + Collections.singletonList(item).iterator()); + } + }); + + ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor); + ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isNotNull(); + Awaitility.await("Queue is populated") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> queueHasElements(iterator)); + iterator.close(); + Awaitility.await("Queue is cleared") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0)); + } finally { + executor.shutdown(); + } } @Test public void closeMoreDataParallelIteratorWithoutCompleteIteration() { ExecutorService executor = Executors.newFixedThreadPool(1); - Iterator<Integer> integerIterator = - new Iterator<Integer>() { - private int number = 1; - - @Override - public boolean hasNext() { - if (number > 1000) { - return false; + try { + Iterator<Integer> integerIterator = + new Iterator<Integer>() { + private int number = 1; + + @Override + public boolean hasNext() { + if (number > 1000) { + return false; + } + + number++; + return true; } - number++; - return true; - } - - @Override - public Integer next() { - try { - // sleep to control number generate rate - Thread.sleep(10); - } catch (InterruptedException e) { - // Sleep interrupted, we ignore it! + @Override + public Integer next() { + try { + // sleep to control number generate rate + Thread.sleep(10); + } catch (InterruptedException e) { + // Sleep interrupted, we ignore it! + } + return number; } - return number; - } - }; - Iterable<CloseableIterable<Integer>> transform = - Iterables.transform( - Lists.newArrayList(1), - item -> - new CloseableIterable<Integer>() { - @Override - public void close() {} - - @Override - public CloseableIterator<Integer> iterator() { - return CloseableIterator.withClose(integerIterator); - } - }); - - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); - - assertThat(iterator.hasNext()).isTrue(); - assertThat(iterator.next()).isNotNull(); - Awaitility.await("Queue is populated") - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> queueHasElements(iterator)); - iterator.close(); - Awaitility.await("Queue is cleared") - .atMost(5, TimeUnit.SECONDS) - .untilAsserted( - () -> - assertThat(iterator.queueSize()) - .as("Queue is not empty after cleaning") - .isEqualTo(0)); + }; + Iterable<CloseableIterable<Integer>> transform = + Iterables.transform( + Lists.newArrayList(1), + item -> + new CloseableIterable<Integer>() { + @Override + public void close() {} + + @Override + public CloseableIterator<Integer> iterator() { + return CloseableIterator.withClose(integerIterator); + } + }); + + ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(transform, executor); + ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isNotNull(); + Awaitility.await("Queue is populated") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> queueHasElements(iterator)); + iterator.close(); + Awaitility.await("Queue is cleared") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> + assertThat(iterator.queueSize()) + .as("Queue is not empty after cleaning") + .isEqualTo(0)); + } finally { + executor.shutdown(); + } } @Test public void limitQueueSize() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); - - Multiset<Integer> expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); - - int maxQueueSize = 20; - ExecutorService executor = Executors.newCachedThreadPool(); - ParallelIterable<Integer> parallelIterable = - new ParallelIterable<>(iterables, executor, maxQueueSize); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); - - Multiset<Integer> actualValues = HashMultiset.create(); - - while (iterator.hasNext()) { - assertThat(iterator.queueSize()) - .as("iterator internal queue size") - .isLessThanOrEqualTo(maxQueueSize + iterables.size()); - actualValues.add(iterator.next()); + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + List<Iterable<Integer>> iterables = + ImmutableList.of( + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator()); + + Multiset<Integer> expectedValues = + IntStream.range(0, 100) + .boxed() + .flatMap(i -> Stream.of(i, i, i)) + .collect(ImmutableMultiset.toImmutableMultiset()); + + int maxQueueSize = 20; + ParallelIterable<Integer> parallelIterable = + new ParallelIterable<>(iterables, executor, maxQueueSize); + ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + + Multiset<Integer> actualValues = HashMultiset.create(); + + while (iterator.hasNext()) { + assertThat(iterator.queueSize()) + .as("iterator internal queue size") + .isLessThanOrEqualTo(100); + actualValues.add(iterator.next()); + } + + assertThat(actualValues) + .as("multiset of values returned by the iterator") + .isEqualTo(expectedValues); + + iterator.close(); + } finally { + executor.shutdown(); } - - assertThat(actualValues) - .as("multiset of values returned by the iterator") - .isEqualTo(expectedValues); - - iterator.close(); - executor.shutdownNow(); } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); - - Multiset<Integer> expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); - - ExecutorService executor = Executors.newCachedThreadPool(); - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); - - Multiset<Integer> actualValues = HashMultiset.create(); - - while (iterator.hasNext()) { - assertThat(iterator.queueSize()) - .as("iterator internal queue size") - .isLessThanOrEqualTo(1 + iterables.size()); - actualValues.add(iterator.next()); + @Timeout(10) + public void noDeadlock() { + // This test simulates a scenario where iterators use a constrained resource + // (e.g. an S3 connection pool that has a limit on the number of connections). + // In this case, the constrained resource shouldn't cause a deadlock when queue + // is full and the iterator is waiting for the queue to be drained. + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + Semaphore semaphore = new Semaphore(1); + + List<Iterable<Integer>> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List<Iterable<Integer>> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); + + ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); + + parallelIterableA.iterator().next(); + parallelIterableB.iterator().next(); + } finally { + executor.shutdownNow(); } + } - assertThat(actualValues) - .as("multiset of values returned by the iterator") - .isEqualTo(expectedValues); + private <T> CloseableIterable<T> testIterable( + RunnableWithException open, RunnableWithException close, Iterator<T> iterator) { + return new CloseableIterable<T>() { + @Override + public void close() { + try { + close.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CloseableIterator<T> iterator() { + try { + open.run(); + return CloseableIterator.withClose(iterator); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } - iterator.close(); - executor.shutdownNow(); + private interface RunnableWithException { + void run() throws Exception; } private void queueHasElements(ParallelIterator<Integer> iterator) {
