This is an automated email from the ASF dual-hosted git repository.

rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 39d95955f55 Do not eagerly close inner iterators in 
CloseableIterator#flatMap (#14986)
39d95955f55 is described below

commit 39d95955f5508143b4fd2352baa3fe60a4920357
Author: Rohan Garg <[email protected]>
AuthorDate: Fri Sep 15 15:14:20 2023 +0530

    Do not eagerly close inner iterators in CloseableIterator#flatMap (#14986)
---
 .../druid/data/input/s3/S3InputSourceTest.java     | 17 ++++-----
 .../util/common/parsers/CloseableIterator.java     | 42 ++++++++++------------
 .../input/impl/InputEntityIteratingReaderTest.java |  8 +++--
 .../util/common/parsers/CloseableIteratorTest.java | 41 +++++++++++++++++++++
 4 files changed, 73 insertions(+), 35 deletions(-)

diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index fc538b682fc..6b0bb537c7e 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -1033,14 +1033,15 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", 
false, null, 0),
         temporaryFolder.newFolder()
     );
-
-    final IllegalStateException e = 
Assert.assertThrows(IllegalStateException.class, reader::read);
-    MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(IOException.class));
-    MatcherAssert.assertThat(e.getCause().getCause(), 
CoreMatchers.instanceOf(SdkClientException.class));
-    MatcherAssert.assertThat(
-        e.getCause().getCause().getMessage(),
-        CoreMatchers.startsWith("Data read has a different length than the 
expected")
-    );
+    try (CloseableIterator<InputRow> readerIterator = reader.read()) {
+      final IllegalStateException e = 
Assert.assertThrows(IllegalStateException.class, readerIterator::hasNext);
+      MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(IOException.class));
+      MatcherAssert.assertThat(e.getCause().getCause(), 
CoreMatchers.instanceOf(SdkClientException.class));
+      MatcherAssert.assertThat(
+          e.getCause().getCause().getMessage(),
+          CoreMatchers.startsWith("Data read has a different length than the 
expected")
+      );
+    }
 
     EasyMock.verify(S3_CLIENT);
   }
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index af1baafe419..7b81934367d 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.java.util.common.parsers;
 
-import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -62,37 +61,37 @@ public interface CloseableIterator<T> extends Iterator<T>, 
Closeable
 
   default <R> CloseableIterator<R> flatMap(Function<T, CloseableIterator<R>> 
function)
   {
-    final CloseableIterator<T> delegate = this;
+    final CloseableIterator<T> outerIterator = this;
 
     return new CloseableIterator<R>()
     {
-      CloseableIterator<R> iterator = findNextIteratorIfNecessary();
+      CloseableIterator<R> currInnerIterator = null;
 
-      @Nullable
-      private CloseableIterator<R> findNextIteratorIfNecessary()
+      private void findNextIteratorIfNecessary()
       {
-        while ((iterator == null || !iterator.hasNext()) && 
delegate.hasNext()) {
-          if (iterator != null) {
+        while ((currInnerIterator == null || !currInnerIterator.hasNext()) && 
outerIterator.hasNext()) {
+          if (currInnerIterator != null) {
             try {
-              iterator.close();
-              iterator = null;
+              currInnerIterator.close();
+              currInnerIterator = null;
             }
             catch (IOException e) {
               throw new UncheckedIOException(e);
             }
           }
-          iterator = function.apply(delegate.next());
-          if (iterator.hasNext()) {
-            return iterator;
+          currInnerIterator = function.apply(outerIterator.next());
+          if (currInnerIterator.hasNext()) {
+            return;
           }
         }
-        return null;
       }
 
       @Override
       public boolean hasNext()
       {
-        return iterator != null && iterator.hasNext();
+        // closes the current iterator if it is finished, and opens a new 
non-empty iterator if possible
+        findNextIteratorIfNecessary();
+        return currInnerIterator != null && currInnerIterator.hasNext();
       }
 
       @Override
@@ -101,21 +100,16 @@ public interface CloseableIterator<T> extends 
Iterator<T>, Closeable
         if (!hasNext()) {
           throw new NoSuchElementException();
         }
-        try {
-          return iterator.next();
-        }
-        finally {
-          findNextIteratorIfNecessary();
-        }
+        return currInnerIterator.next();
       }
 
       @Override
       public void close() throws IOException
       {
-        delegate.close();
-        if (iterator != null) {
-          iterator.close();
-          iterator = null;
+        outerIterator.close();
+        if (currInnerIterator != null) {
+          currInnerIterator.close();
+          currInnerIterator = null;
         }
       }
     };
diff --git 
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
 
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index 9e175b2a3df..a33899b2535 100644
--- 
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -137,9 +137,11 @@ public class InputEntityIteratingReaderTest extends 
InitializedNullHandlingTest
         ).iterator(),
         temporaryFolder.newFolder()
     );
-    String expectedMessage = "Error occurred while trying to read uri: 
testscheme://some/path";
-    Exception exception = Assert.assertThrows(RuntimeException.class, 
firehose::read);
 
-    Assert.assertTrue(exception.getMessage().contains(expectedMessage));
+    try (CloseableIterator<InputRow> readIterator = firehose.read()) {
+      String expectedMessage = "Error occurred while trying to read uri: 
testscheme://some/path";
+      Exception exception = Assert.assertThrows(RuntimeException.class, 
readIterator::hasNext);
+      Assert.assertTrue(exception.getMessage().contains(expectedMessage));
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
 
b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
index be2d1d58bd5..3f701ee92f6 100644
--- 
a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
@@ -120,6 +120,47 @@ public class CloseableIteratorTest
     }
   }
 
+  @Test
+  public void testFlatMapInnerClose() throws IOException
+  {
+    List<CloseTrackingCloseableIterator<Integer>> innerIterators = new 
ArrayList<>();
+    // the nested iterators is : [ [], [0], [0, 1] ]
+    try (final CloseTrackingCloseableIterator<Integer> actual = new 
CloseTrackingCloseableIterator<>(
+        generateTestIterator(3)
+            .flatMap(list -> {
+              CloseTrackingCloseableIterator<Integer> inner =
+                  new 
CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+              innerIterators.add(inner);
+              return inner;
+            })
+    )) {
+      final Iterator<Integer> expected = IntStream
+          .range(0, 3)
+          .flatMap(i -> IntStream.range(0, i))
+          .iterator();
+
+      int iterCount = 0, innerIteratorIdx = 0;
+      while (actual.hasNext()) {
+        iterCount++;
+        if (iterCount == 1) {
+          Assert.assertEquals(2, innerIterators.size()); //empty iterator and 
single element iterator
+          innerIteratorIdx++;
+        } else if (iterCount == 2) {
+          Assert.assertEquals(3, innerIterators.size()); //empty iterator + 
single element iterator + double element iterator
+          innerIteratorIdx++;
+        }
+        Assert.assertEquals(expected.next(), actual.next()); // assert 
expected value to the iterator's value
+        for (int i = 0; i < innerIteratorIdx; i++) {
+          Assert.assertEquals(1, innerIterators.get(i).closeCount); // expect 
all previous iterators to be closed
+        }
+        // never expect the current iterator to be closed, even after doing 
the last next call on it
+        Assert.assertEquals(0, 
innerIterators.get(innerIteratorIdx).closeCount);
+      }
+    }
+    // check the last inner iterator is closed
+    Assert.assertEquals(1, innerIterators.get(2).closeCount);
+  }
+
   private static CloseableIterator<List<Integer>> generateTestIterator(int 
numIterates)
   {
     return new CloseableIterator<List<Integer>>()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to