This is an automated email from the ASF dual-hosted git repository.
rohangarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 39d95955f55 Do not eagerly close inner iterators in
CloseableIterator#flatMap (#14986)
39d95955f55 is described below
commit 39d95955f5508143b4fd2352baa3fe60a4920357
Author: Rohan Garg <[email protected]>
AuthorDate: Fri Sep 15 15:14:20 2023 +0530
Do not eagerly close inner iterators in CloseableIterator#flatMap (#14986)
---
.../druid/data/input/s3/S3InputSourceTest.java | 17 ++++-----
.../util/common/parsers/CloseableIterator.java | 42 ++++++++++------------
.../input/impl/InputEntityIteratingReaderTest.java | 8 +++--
.../util/common/parsers/CloseableIteratorTest.java | 41 +++++++++++++++++++++
4 files changed, 73 insertions(+), 35 deletions(-)
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index fc538b682fc..6b0bb537c7e 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -1033,14 +1033,15 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|",
false, null, 0),
temporaryFolder.newFolder()
);
-
- final IllegalStateException e =
Assert.assertThrows(IllegalStateException.class, reader::read);
- MatcherAssert.assertThat(e.getCause(),
CoreMatchers.instanceOf(IOException.class));
- MatcherAssert.assertThat(e.getCause().getCause(),
CoreMatchers.instanceOf(SdkClientException.class));
- MatcherAssert.assertThat(
- e.getCause().getCause().getMessage(),
- CoreMatchers.startsWith("Data read has a different length than the
expected")
- );
+ try (CloseableIterator<InputRow> readerIterator = reader.read()) {
+ final IllegalStateException e =
Assert.assertThrows(IllegalStateException.class, readerIterator::hasNext);
+ MatcherAssert.assertThat(e.getCause(),
CoreMatchers.instanceOf(IOException.class));
+ MatcherAssert.assertThat(e.getCause().getCause(),
CoreMatchers.instanceOf(SdkClientException.class));
+ MatcherAssert.assertThat(
+ e.getCause().getCause().getMessage(),
+ CoreMatchers.startsWith("Data read has a different length than the
expected")
+ );
+ }
EasyMock.verify(S3_CLIENT);
}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index af1baafe419..7b81934367d 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -19,7 +19,6 @@
package org.apache.druid.java.util.common.parsers;
-import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -62,37 +61,37 @@ public interface CloseableIterator<T> extends Iterator<T>,
Closeable
default <R> CloseableIterator<R> flatMap(Function<T, CloseableIterator<R>>
function)
{
- final CloseableIterator<T> delegate = this;
+ final CloseableIterator<T> outerIterator = this;
return new CloseableIterator<R>()
{
- CloseableIterator<R> iterator = findNextIteratorIfNecessary();
+ CloseableIterator<R> currInnerIterator = null;
- @Nullable
- private CloseableIterator<R> findNextIteratorIfNecessary()
+ private void findNextIteratorIfNecessary()
{
- while ((iterator == null || !iterator.hasNext()) &&
delegate.hasNext()) {
- if (iterator != null) {
+ while ((currInnerIterator == null || !currInnerIterator.hasNext()) &&
outerIterator.hasNext()) {
+ if (currInnerIterator != null) {
try {
- iterator.close();
- iterator = null;
+ currInnerIterator.close();
+ currInnerIterator = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
- iterator = function.apply(delegate.next());
- if (iterator.hasNext()) {
- return iterator;
+ currInnerIterator = function.apply(outerIterator.next());
+ if (currInnerIterator.hasNext()) {
+ return;
}
}
- return null;
}
@Override
public boolean hasNext()
{
- return iterator != null && iterator.hasNext();
+ // closes the current iterator if it is finished, and opens a new
non-empty iterator if possible
+ findNextIteratorIfNecessary();
+ return currInnerIterator != null && currInnerIterator.hasNext();
}
@Override
@@ -101,21 +100,16 @@ public interface CloseableIterator<T> extends
Iterator<T>, Closeable
if (!hasNext()) {
throw new NoSuchElementException();
}
- try {
- return iterator.next();
- }
- finally {
- findNextIteratorIfNecessary();
- }
+ return currInnerIterator.next();
}
@Override
public void close() throws IOException
{
- delegate.close();
- if (iterator != null) {
- iterator.close();
- iterator = null;
+ outerIterator.close();
+ if (currInnerIterator != null) {
+ currInnerIterator.close();
+ currInnerIterator = null;
}
}
};
diff --git
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index 9e175b2a3df..a33899b2535 100644
---
a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++
b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -137,9 +137,11 @@ public class InputEntityIteratingReaderTest extends
InitializedNullHandlingTest
).iterator(),
temporaryFolder.newFolder()
);
- String expectedMessage = "Error occurred while trying to read uri:
testscheme://some/path";
- Exception exception = Assert.assertThrows(RuntimeException.class,
firehose::read);
- Assert.assertTrue(exception.getMessage().contains(expectedMessage));
+ try (CloseableIterator<InputRow> readIterator = firehose.read()) {
+ String expectedMessage = "Error occurred while trying to read uri:
testscheme://some/path";
+ Exception exception = Assert.assertThrows(RuntimeException.class,
readIterator::hasNext);
+ Assert.assertTrue(exception.getMessage().contains(expectedMessage));
+ }
}
}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
index be2d1d58bd5..3f701ee92f6 100644
---
a/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/common/parsers/CloseableIteratorTest.java
@@ -120,6 +120,47 @@ public class CloseableIteratorTest
}
}
+ @Test
+ public void testFlatMapInnerClose() throws IOException
+ {
+ List<CloseTrackingCloseableIterator<Integer>> innerIterators = new
ArrayList<>();
+ // the nested iterators is : [ [], [0], [0, 1] ]
+ try (final CloseTrackingCloseableIterator<Integer> actual = new
CloseTrackingCloseableIterator<>(
+ generateTestIterator(3)
+ .flatMap(list -> {
+ CloseTrackingCloseableIterator<Integer> inner =
+ new
CloseTrackingCloseableIterator<>(CloseableIterators.withEmptyBaggage(list.iterator()));
+ innerIterators.add(inner);
+ return inner;
+ })
+ )) {
+ final Iterator<Integer> expected = IntStream
+ .range(0, 3)
+ .flatMap(i -> IntStream.range(0, i))
+ .iterator();
+
+ int iterCount = 0, innerIteratorIdx = 0;
+ while (actual.hasNext()) {
+ iterCount++;
+ if (iterCount == 1) {
+ Assert.assertEquals(2, innerIterators.size()); //empty iterator and
single element iterator
+ innerIteratorIdx++;
+ } else if (iterCount == 2) {
+ Assert.assertEquals(3, innerIterators.size()); //empty iterator +
single element iterator + double element iterator
+ innerIteratorIdx++;
+ }
+ Assert.assertEquals(expected.next(), actual.next()); // assert
expected value to the iterator's value
+ for (int i = 0; i < innerIteratorIdx; i++) {
+ Assert.assertEquals(1, innerIterators.get(i).closeCount); // expect
all previous iterators to be closed
+ }
+ // never expect the current iterator to be closed, even after doing
the last next call on it
+ Assert.assertEquals(0,
innerIterators.get(innerIteratorIdx).closeCount);
+ }
+ }
+ // check the last inner iterator is closed
+ Assert.assertEquals(1, innerIterators.get(2).closeCount);
+ }
+
private static CloseableIterator<List<Integer>> generateTestIterator(int
numIterates)
{
return new CloseableIterator<List<Integer>>()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]