This is an automated email from the ASF dual-hosted git repository.

findepi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c7910bb401 Core: Fix possible deadlock in ParallelIterable (#11781)
c7910bb401 is described below

commit c7910bb401f7f7fd09010bede0d80f5d2164afd5
Author: Karol Sobczak <[email protected]>
AuthorDate: Sat Jan 11 17:13:18 2025 +0100

    Core: Fix possible deadlock in ParallelIterable (#11781)
    
    * Fix ParallelIterable deadlock
    
    It was observed that with high concurrency/high workload scenario
    cluster deadlocks due to manifest readers waiting for connection from S3 
pool.
    
    Specifically, ManifestGroup#plan will create ManifestReader per every 
ParallelIterable.Task.
    These readers will effectively hold onto S3 connection from the pool.
    When ParallelIterable queue is full, Task will be tabled for later use.
    
    Consider scenario:
    S3 connection pool size=1
    approximateMaxQueueSize=1
    workerPoolSize=1
    
    ParallelIterable1: starts TaskP1
    ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put 
on hold (holds S3 connection)
    ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is 
blocked on S3 connection pool
    ParallelIterable1: result gets consumed, TaskP1 is scheduled again
    ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is 
waiting for TaskP1 to release connection
    
    The fix make sure Task is finished once it's started. This way limited 
resources like
    connection pool are not put on hold. Queue size might exceed strict limits, 
but it should
    still be bounded.
    
    Fixes https://github.com/apache/iceberg/issues/11768
    
    * Do not submit a task when there is no space in queue
---
 .../org/apache/iceberg/util/ParallelIterable.java  | 27 ++++++--
 .../apache/iceberg/util/TestParallelIterable.java  | 80 ++++++++++++++--------
 2 files changed, 71 insertions(+), 36 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java 
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index d40f648447..7acab8762f 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -86,6 +86,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
     private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new 
ConcurrentLinkedQueue<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final int maxQueueSize;
 
     private ParallelIterator(
         Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, 
int maxQueueSize) {
@@ -97,6 +98,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
       this.workerPool = workerPool;
       // submit 2 tasks per worker at a time
       this.taskFutures = new CompletableFuture[2 * 
ThreadPools.WORKER_THREAD_POOL_SIZE];
+      this.maxQueueSize = maxQueueSize;
     }
 
     @Override
@@ -153,6 +155,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
             try {
               Optional<Task<T>> continuation = taskFutures[i].get();
               continuation.ifPresent(yieldedTasks::addLast);
+              taskFutures[i] = null;
             } catch (ExecutionException e) {
               if (e.getCause() instanceof RuntimeException) {
                 // rethrow a runtime exception
@@ -165,7 +168,10 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
             }
           }
 
-          taskFutures[i] = submitNextTask();
+          // submit a new task if there is space in the queue
+          if (queue.size() < maxQueueSize) {
+            taskFutures[i] = submitNextTask();
+          }
         }
 
         if (taskFutures[i] != null) {
@@ -257,17 +263,24 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
     @Override
     public Optional<Task<T>> get() {
       try {
+        if (queue.size() >= approximateMaxQueueSize) {
+          // Yield when queue is over the size limit. Task will be resubmitted 
later and continue
+          // the work.
+          //
+          // Tasks might hold references (via iterator) to constrained 
resources
+          // (e.g. pooled connections). Hence, tasks should yield only when
+          // iterator is not instantiated. Otherwise, there could be
+          // a deadlock when yielded tasks are waiting to be executed while
+          // currently executed tasks are waiting for the resources that are 
held
+          // by the yielded tasks.
+          return Optional.of(this);
+        }
+
         if (iterator == null) {
           iterator = input.iterator();
         }
 
         while (iterator.hasNext()) {
-          if (queue.size() >= approximateMaxQueueSize) {
-            // Yield when queue is over the size limit. Task will be 
resubmitted later and continue
-            // the work.
-            return Optional.of(this);
-          }
-
           T next = iterator.next();
           if (closed.get()) {
             break;
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java 
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 410e33058d..a1e14a22a7 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -39,6 +40,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Multiset;
 import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 public class TestParallelIterable {
   @Test
@@ -143,7 +145,7 @@ public class TestParallelIterable {
 
   @Test
   public void limitQueueSize() {
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ExecutorService executor = Executors.newSingleThreadExecutor();
     try {
       List<Iterable<Integer>> iterables =
           ImmutableList.of(
@@ -167,7 +169,7 @@ public class TestParallelIterable {
       while (iterator.hasNext()) {
         assertThat(iterator.queueSize())
             .as("iterator internal queue size")
-            .isLessThanOrEqualTo(maxQueueSize + iterables.size());
+            .isLessThanOrEqualTo(100);
         actualValues.add(iterator.next());
       }
 
@@ -182,41 +184,61 @@ public class TestParallelIterable {
   }
 
   @Test
-  public void queueSizeOne() {
-    ExecutorService executor = Executors.newCachedThreadPool();
+  @Timeout(10)
+  public void noDeadlock() {
+    // This test simulates a scenario where iterators use a constrained 
resource
+    // (e.g. an S3 connection pool that has a limit on the number of 
connections).
+    // In this case, the constrained resource shouldn't cause a deadlock when 
queue
+    // is full and the iterator is waiting for the queue to be drained.
+    ExecutorService executor = Executors.newFixedThreadPool(1);
     try {
-      List<Iterable<Integer>> iterables =
-          ImmutableList.of(
-              () -> IntStream.range(0, 100).iterator(),
-              () -> IntStream.range(0, 100).iterator(),
-              () -> IntStream.range(0, 100).iterator());
+      Semaphore semaphore = new Semaphore(1);
 
-      Multiset<Integer> expectedValues =
-          IntStream.range(0, 100)
-              .boxed()
-              .flatMap(i -> Stream.of(i, i, i))
-              .collect(ImmutableMultiset.toImmutableMultiset());
+      List<Iterable<Integer>> iterablesA =
+          ImmutableList.of(
+              testIterable(
+                  semaphore::acquire, semaphore::release, IntStream.range(0, 
100).iterator()));
+      List<Iterable<Integer>> iterablesB =
+          ImmutableList.of(
+              testIterable(
+                  semaphore::acquire, semaphore::release, IntStream.range(200, 
300).iterator()));
 
-      ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(iterables, executor, 1);
-      ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+      ParallelIterable<Integer> parallelIterableA = new 
ParallelIterable<>(iterablesA, executor, 1);
+      ParallelIterable<Integer> parallelIterableB = new 
ParallelIterable<>(iterablesB, executor, 1);
 
-      Multiset<Integer> actualValues = HashMultiset.create();
+      parallelIterableA.iterator().next();
+      parallelIterableB.iterator().next();
+    } finally {
+      executor.shutdownNow();
+    }
+  }
 
-      while (iterator.hasNext()) {
-        assertThat(iterator.queueSize())
-            .as("iterator internal queue size")
-            .isLessThanOrEqualTo(1 + iterables.size());
-        actualValues.add(iterator.next());
+  private <T> CloseableIterable<T> testIterable(
+      RunnableWithException open, RunnableWithException close, Iterator<T> 
iterator) {
+    return new CloseableIterable<T>() {
+      @Override
+      public void close() {
+        try {
+          close.run();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
       }
 
-      assertThat(actualValues)
-          .as("multiset of values returned by the iterator")
-          .isEqualTo(expectedValues);
+      @Override
+      public CloseableIterator<T> iterator() {
+        try {
+          open.run();
+          return CloseableIterator.withClose(iterator);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
 
-      iterator.close();
-    } finally {
-      executor.shutdown();
-    }
+  private interface RunnableWithException {
+    void run() throws Exception;
   }
 
   private void queueHasElements(ParallelIterator<Integer> iterator) {

Reply via email to