This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d3cb1b696a Core: Fix ParallelIterable memory leak where queue 
continues to be populated even after iterator close (#9402)
d3cb1b696a is described below

commit d3cb1b696a1631a9cca4619f93252cd0a985fbfc
Author: Helt <[email protected]>
AuthorDate: Tue Jul 2 05:33:17 2024 +0800

    Core: Fix ParallelIterable memory leak where queue continues to be 
populated even after iterator close (#9402)
---
 .../org/apache/iceberg/util/ParallelIterable.java  |  6 +++
 .../apache/iceberg/util/TestParallelIterable.java  | 61 ++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java 
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 108757b415..d7221e7d45 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -67,6 +67,12 @@ public class ParallelIterable<T> extends CloseableGroup 
implements CloseableIter
                             try (Closeable ignored =
                                 (iterable instanceof Closeable) ? (Closeable) 
iterable : () -> {}) {
                               for (T item : iterable) {
+                                // exit manually because 
`ConcurrentLinkedQueue` can't be
+                                // interrupted
+                                if (closed) {
+                                  return;
+                                }
+
                                 queue.add(item);
                               }
                             } catch (IOException e) {
diff --git 
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java 
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 68685614d3..af9c6ec521 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -72,6 +73,66 @@ public class TestParallelIterable {
         .untilAsserted(() -> assertThat(queue).isEmpty());
   }
 
+  @Test
+  public void closeMoreDataParallelIteratorWithoutCompleteIteration()
+      throws IOException, IllegalAccessException, NoSuchFieldException {
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Iterator<Integer> integerIterator =
+        new Iterator<Integer>() {
+          private int number = 1;
+
+          @Override
+          public boolean hasNext() {
+            if (number > 1000) {
+              return false;
+            }
+
+            number++;
+            return true;
+          }
+
+          @Override
+          public Integer next() {
+            try {
+              // sleep to control number generate rate
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              // Sleep interrupted, we ignore it!
+            }
+            return number;
+          }
+        };
+    Iterable<CloseableIterable<Integer>> transform =
+        Iterables.transform(
+            Lists.newArrayList(1),
+            item ->
+                new CloseableIterable<Integer>() {
+                  @Override
+                  public void close() {}
+
+                  @Override
+                  public CloseableIterator<Integer> iterator() {
+                    return CloseableIterator.withClose(integerIterator);
+                  }
+                });
+
+    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(transform, executor);
+    CloseableIterator<Integer> iterator = parallelIterable.iterator();
+    Field queueField = iterator.getClass().getDeclaredField("queue");
+    queueField.setAccessible(true);
+    ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) 
queueField.get(iterator);
+
+    assertThat(iterator.hasNext()).isTrue();
+    assertThat(iterator.next()).isNotNull();
+    Awaitility.await("Queue is populated")
+        .atMost(5, TimeUnit.SECONDS)
+        .untilAsserted(() -> queueHasElements(iterator, queue));
+    iterator.close();
+    Awaitility.await("Queue is cleared")
+        .atMost(5, TimeUnit.SECONDS)
+        .untilAsserted(() -> assertThat(queue).as("Queue is not empty after 
cleaning").isEmpty());
+  }
+
   private void queueHasElements(CloseableIterator<Integer> iterator, Queue 
queue) {
     assertThat(iterator.hasNext()).isTrue();
     assertThat(iterator.next()).isNotNull();

Reply via email to