This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 5fbf369593dae04584439179388f3d4ade90e03b
Author: Karol Sobczak <[email protected]>
AuthorDate: Sat Jan 11 17:13:18 2025 +0100

    Core: Fix possible deadlock in ParallelIterable (#11781)
    
    * Fix ParallelIterable deadlock
    
    It was observed that with high concurrency/high workload scenario
    cluster deadlocks due to manifest readers waiting for connection from S3 
pool.
    
    Specifically, ManifestGroup#plan will create ManifestReader per every 
ParallelIterable.Task.
    These readers will effectively hold onto S3 connection from the pool.
    When ParallelIterable queue is full, Task will be tabled for later use.
    
    Consider scenario:
    S3 connection pool size=1
    approximateMaxQueueSize=1
    workerPoolSize=1
    
    ParallelIterable1: starts TaskP1
    ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put 
on hold (holds S3 connection)
    ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is 
blocked on S3 connection pool
    ParallelIterable1: result gets consumed, TaskP1 is scheduled again
    ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is 
waiting for TaskP1 to release connection
    
    The fix make sure Task is finished once it's started. This way limited 
resources like
    connection pool are not put on hold. Queue size might exceed strict limits, 
but it should
    still be bounded.
    
    Fixes https://github.com/apache/iceberg/issues/11768
    
    * Do not submit a task when there is no space in queue
---
 .../org/apache/iceberg/util/ParallelIterable.java  |  27 +-
 .../apache/iceberg/util/TestParallelIterable.java  | 316 ++++++++++++---------
 2 files changed, 196 insertions(+), 147 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java 
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index d40f648447..7acab8762f 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -86,6 +86,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
     private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new 
ConcurrentLinkedQueue<>();
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final int maxQueueSize;
 
     private ParallelIterator(
         Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, 
int maxQueueSize) {
@@ -97,6 +98,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
       this.workerPool = workerPool;
       // submit 2 tasks per worker at a time
       this.taskFutures = new CompletableFuture[2 * 
ThreadPools.WORKER_THREAD_POOL_SIZE];
+      this.maxQueueSize = maxQueueSize;
     }
 
     @Override
@@ -153,6 +155,7 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
             try {
               Optional<Task<T>> continuation = taskFutures[i].get();
               continuation.ifPresent(yieldedTasks::addLast);
+              taskFutures[i] = null;
             } catch (ExecutionException e) {
               if (e.getCause() instanceof RuntimeException) {
                 // rethrow a runtime exception
@@ -165,7 +168,10 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
             }
           }
 
-          taskFutures[i] = submitNextTask();
+          // submit a new task if there is space in the queue
+          if (queue.size() < maxQueueSize) {
+            taskFutures[i] = submitNextTask();
+          }
         }
 
         if (taskFutures[i] != null) {
@@ -257,17 +263,24 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
     @Override
     public Optional<Task<T>> get() {
       try {
+        if (queue.size() >= approximateMaxQueueSize) {
+          // Yield when queue is over the size limit. Task will be resubmitted 
later and continue
+          // the work.
+          //
+          // Tasks might hold references (via iterator) to constrained 
resources
+          // (e.g. pooled connections). Hence, tasks should yield only when
+          // iterator is not instantiated. Otherwise, there could be
+          // a deadlock when yielded tasks are waiting to be executed while
+          // currently executed tasks are waiting for the resources that are 
held
+          // by the yielded tasks.
+          return Optional.of(this);
+        }
+
         if (iterator == null) {
           iterator = input.iterator();
         }
 
         while (iterator.hasNext()) {
-          if (queue.size() >= approximateMaxQueueSize) {
-            // Yield when queue is over the size limit. Task will be 
resubmitted later and continue
-            // the work.
-            return Optional.of(this);
-          }
-
           T next = iterator.next();
           if (closed.get()) {
             break;
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java 
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 5e37e0390d..a1e14a22a7 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -39,170 +40,205 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Multiset;
 import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 public class TestParallelIterable {
   @Test
   public void closeParallelIteratorWithoutCompleteIteration() {
     ExecutorService executor = Executors.newFixedThreadPool(1);
-
-    Iterable<CloseableIterable<Integer>> transform =
-        Iterables.transform(
-            Lists.newArrayList(1, 2, 3, 4, 5),
-            item ->
-                new CloseableIterable<Integer>() {
-                  @Override
-                  public void close() {}
-
-                  @Override
-                  public CloseableIterator<Integer> iterator() {
-                    return 
CloseableIterator.withClose(Collections.singletonList(item).iterator());
-                  }
-                });
-
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
-
-    assertThat(iterator.hasNext()).isTrue();
-    assertThat(iterator.next()).isNotNull();
-    Awaitility.await("Queue is populated")
-        .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator));
-    iterator.close();
-    Awaitility.await("Queue is cleared")
-        .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
+    try {
+      Iterable<CloseableIterable<Integer>> transform =
+          Iterables.transform(
+              Lists.newArrayList(1, 2, 3, 4, 5),
+              item ->
+                  new CloseableIterable<Integer>() {
+                    @Override
+                    public void close() {}
+
+                    @Override
+                    public CloseableIterator<Integer> iterator() {
+                      return CloseableIterator.withClose(
+                          Collections.singletonList(item).iterator());
+                    }
+                  });
+
+      ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
+      ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+
+      assertThat(iterator.hasNext()).isTrue();
+      assertThat(iterator.next()).isNotNull();
+      Awaitility.await("Queue is populated")
+          .atMost(5, TimeUnit.SECONDS)
+          .untilAsserted(() -> queueHasElements(iterator));
+      iterator.close();
+      Awaitility.await("Queue is cleared")
+          .atMost(5, TimeUnit.SECONDS)
+          .untilAsserted(() -> assertThat(iterator.queueSize()).isEqualTo(0));
+    } finally {
+      executor.shutdown();
+    }
   }
 
   @Test
   public void closeMoreDataParallelIteratorWithoutCompleteIteration() {
     ExecutorService executor = Executors.newFixedThreadPool(1);
-    Iterator<Integer> integerIterator =
-        new Iterator<Integer>() {
-          private int number = 1;
-
-          @Override
-          public boolean hasNext() {
-            if (number > 1000) {
-              return false;
+    try {
+      Iterator<Integer> integerIterator =
+          new Iterator<Integer>() {
+            private int number = 1;
+
+            @Override
+            public boolean hasNext() {
+              if (number > 1000) {
+                return false;
+              }
+
+              number++;
+              return true;
             }
 
-            number++;
-            return true;
-          }
-
-          @Override
-          public Integer next() {
-            try {
-              // sleep to control number generate rate
-              Thread.sleep(10);
-            } catch (InterruptedException e) {
-              // Sleep interrupted, we ignore it!
+            @Override
+            public Integer next() {
+              try {
+                // sleep to control number generate rate
+                Thread.sleep(10);
+              } catch (InterruptedException e) {
+                // Sleep interrupted, we ignore it!
+              }
+              return number;
             }
-            return number;
-          }
-        };
-    Iterable<CloseableIterable<Integer>> transform =
-        Iterables.transform(
-            Lists.newArrayList(1),
-            item ->
-                new CloseableIterable<Integer>() {
-                  @Override
-                  public void close() {}
-
-                  @Override
-                  public CloseableIterator<Integer> iterator() {
-                    return CloseableIterator.withClose(integerIterator);
-                  }
-                });
-
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
-
-    assertThat(iterator.hasNext()).isTrue();
-    assertThat(iterator.next()).isNotNull();
-    Awaitility.await("Queue is populated")
-        .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(() -> queueHasElements(iterator));
-    iterator.close();
-    Awaitility.await("Queue is cleared")
-        .atMost(5, TimeUnit.SECONDS)
-        .untilAsserted(
-            () ->
-                assertThat(iterator.queueSize())
-                    .as("Queue is not empty after cleaning")
-                    .isEqualTo(0));
+          };
+      Iterable<CloseableIterable<Integer>> transform =
+          Iterables.transform(
+              Lists.newArrayList(1),
+              item ->
+                  new CloseableIterable<Integer>() {
+                    @Override
+                    public void close() {}
+
+                    @Override
+                    public CloseableIterator<Integer> iterator() {
+                      return CloseableIterator.withClose(integerIterator);
+                    }
+                  });
+
+      ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
+      ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+
+      assertThat(iterator.hasNext()).isTrue();
+      assertThat(iterator.next()).isNotNull();
+      Awaitility.await("Queue is populated")
+          .atMost(5, TimeUnit.SECONDS)
+          .untilAsserted(() -> queueHasElements(iterator));
+      iterator.close();
+      Awaitility.await("Queue is cleared")
+          .atMost(5, TimeUnit.SECONDS)
+          .untilAsserted(
+              () ->
+                  assertThat(iterator.queueSize())
+                      .as("Queue is not empty after cleaning")
+                      .isEqualTo(0));
+    } finally {
+      executor.shutdown();
+    }
   }
 
   @Test
   public void limitQueueSize() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
-
-    Multiset<Integer> expectedValues =
-        IntStream.range(0, 100)
-            .boxed()
-            .flatMap(i -> Stream.of(i, i, i))
-            .collect(ImmutableMultiset.toImmutableMultiset());
-
-    int maxQueueSize = 20;
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ParallelIterable<Integer> parallelIterable =
-        new ParallelIterable<>(iterables, executor, maxQueueSize);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
-
-    Multiset<Integer> actualValues = HashMultiset.create();
-
-    while (iterator.hasNext()) {
-      assertThat(iterator.queueSize())
-          .as("iterator internal queue size")
-          .isLessThanOrEqualTo(maxQueueSize + iterables.size());
-      actualValues.add(iterator.next());
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      List<Iterable<Integer>> iterables =
+          ImmutableList.of(
+              () -> IntStream.range(0, 100).iterator(),
+              () -> IntStream.range(0, 100).iterator(),
+              () -> IntStream.range(0, 100).iterator());
+
+      Multiset<Integer> expectedValues =
+          IntStream.range(0, 100)
+              .boxed()
+              .flatMap(i -> Stream.of(i, i, i))
+              .collect(ImmutableMultiset.toImmutableMultiset());
+
+      int maxQueueSize = 20;
+      ParallelIterable<Integer> parallelIterable =
+          new ParallelIterable<>(iterables, executor, maxQueueSize);
+      ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+
+      Multiset<Integer> actualValues = HashMultiset.create();
+
+      while (iterator.hasNext()) {
+        assertThat(iterator.queueSize())
+            .as("iterator internal queue size")
+            .isLessThanOrEqualTo(100);
+        actualValues.add(iterator.next());
+      }
+
+      assertThat(actualValues)
+          .as("multiset of values returned by the iterator")
+          .isEqualTo(expectedValues);
+
+      iterator.close();
+    } finally {
+      executor.shutdown();
     }
-
-    assertThat(actualValues)
-        .as("multiset of values returned by the iterator")
-        .isEqualTo(expectedValues);
-
-    iterator.close();
-    executor.shutdownNow();
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
-
-    Multiset<Integer> expectedValues =
-        IntStream.range(0, 100)
-            .boxed()
-            .flatMap(i -> Stream.of(i, i, i))
-            .collect(ImmutableMultiset.toImmutableMultiset());
-
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(iterables, executor, 1);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
-
-    Multiset<Integer> actualValues = HashMultiset.create();
-
-    while (iterator.hasNext()) {
-      assertThat(iterator.queueSize())
-          .as("iterator internal queue size")
-          .isLessThanOrEqualTo(1 + iterables.size());
-      actualValues.add(iterator.next());
+  @Timeout(10)
+  public void noDeadlock() {
+    // This test simulates a scenario where iterators use a constrained 
resource
+    // (e.g. an S3 connection pool that has a limit on the number of 
connections).
+    // In this case, the constrained resource shouldn't cause a deadlock when 
queue
+    // is full and the iterator is waiting for the queue to be drained.
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    try {
+      Semaphore semaphore = new Semaphore(1);
+
+      List<Iterable<Integer>> iterablesA =
+          ImmutableList.of(
+              testIterable(
+                  semaphore::acquire, semaphore::release, IntStream.range(0, 
100).iterator()));
+      List<Iterable<Integer>> iterablesB =
+          ImmutableList.of(
+              testIterable(
+                  semaphore::acquire, semaphore::release, IntStream.range(200, 
300).iterator()));
+
+      ParallelIterable<Integer> parallelIterableA = new 
ParallelIterable<>(iterablesA, executor, 1);
+      ParallelIterable<Integer> parallelIterableB = new 
ParallelIterable<>(iterablesB, executor, 1);
+
+      parallelIterableA.iterator().next();
+      parallelIterableB.iterator().next();
+    } finally {
+      executor.shutdownNow();
     }
+  }
 
-    assertThat(actualValues)
-        .as("multiset of values returned by the iterator")
-        .isEqualTo(expectedValues);
+  private <T> CloseableIterable<T> testIterable(
+      RunnableWithException open, RunnableWithException close, Iterator<T> 
iterator) {
+    return new CloseableIterable<T>() {
+      @Override
+      public void close() {
+        try {
+          close.run();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public CloseableIterator<T> iterator() {
+        try {
+          open.run();
+          return CloseableIterator.withClose(iterator);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
 
-    iterator.close();
-    executor.shutdownNow();
+  private interface RunnableWithException {
+    void run() throws Exception;
   }
 
   private void queueHasElements(ParallelIterator<Integer> iterator) {

Reply via email to