RainerMatthiasS commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1306992807
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -111,9 +128,4 @@ public void remove() {
}
}
}
-
- @Override
- public void close() throws Exception {
Review Comment:
Sure :smile: . Only that both implementations are unrelated and only happen
to have the same name.
(Should I move this down?)
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,65 +31,82 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
/**
* An iterator for reading all keys in a state backend across multiple
partitioned states.
*
* <p>To read unique keys across all partitioned states callers must invoke
{@link
* MultiStateKeyIterator#remove}.
*
+ * <p>Note: This is a replacement of the original implementation which used
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)}
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *
href="https://bugs.openjdk.org/browse/JDK-8267359">https://bugs.openjdk.org/browse/JDK-8267359</a>
* @param <K> Type of the key by which state is keyed.
*/
@Internal
-public final class MultiStateKeyIterator<K> implements CloseableIterator<K> {
Review Comment:
:+1:
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors()
throws Exception {
.count());
}
}
+
+ /** Test for lazy enumeration of inner iterators. */
+ @Test
+ public void testIteratorPullsSingleKeyFromAllDescriptors() throws
AssertionError {
Review Comment:
@masteryhx Is there anything else I can do in order to progress this PR?
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors()
throws Exception {
.count());
}
}
+
+ /** Test for lazy enumeration of inner iterators. */
+ @Test
+ public void testIteratorPullsSingleKeyFromAllDescriptors() throws
AssertionError {
Review Comment:
No, it actually tests the 'economy' of `hasNext()` and tests that a single
call to `hasNext()` does **not** iterate the whole collection (which is the
case for the original implementation by means of java streams (the
[JDK-8267359](https://bugs.openjdk.org/browse/JDK-8267359) flaw)).
I.e. this test is exactly as intended.
To test completeness of enumeration is done in
`testIteratorPullsKeyFromAllDescriptors()`.
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors()
throws Exception {
.count());
}
}
+
+ /** Test for lazy enumeration of inner iterators. */
+ @Test
+ public void testIteratorPullsSingleKeyFromAllDescriptors() throws
AssertionError {
Review Comment:
The original implementation of `MultiStateKeyIterator` would iterate all 1e8
keys and store them in a `SpinedBuffer`, just for a single initial call to
`hasNext()`. The test asserts that only a single key is touched. Technically,
enumerating all keys is still a correct implementation but a wasteful one that
might even lead to OOM errors.
We only count how many keys are actually enumerated instead of observing the
actual memory (GC) consumption, which would be too brittle as a unit test.
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +185,117 @@ public void testIteratorRemovesFromAllDescriptors()
throws Exception {
.count());
}
}
+
+ /** Test for lazy enumeration of inner iterators. */
+ @Test
+ public void testIteratorPullsSingleKeyFromAllDescriptors() throws
AssertionError {
+ CountingManyKeysKeyedStateBackend keyedStateBackend =
+ createManyKeysKeyedStateBackend(100_000_000);
+ MultiStateKeyIterator<Integer> testedIterator =
+ new MultiStateKeyIterator<>(descriptors, keyedStateBackend);
+
+ boolean hasnext = testedIterator.hasNext();
+
+ Assert.assertEquals(
+ "Unexpected number of keys enumerated",
+ 1,
+ keyedStateBackend.numberOfKeysEnumerated);
+ }
+
+ /**
+ * Mockup {@link AbstractKeyedStateBackend} that counts how many keys are
enumerated.
+ *
+ * <p>Generates a configured nmber of integer keys, only method actually
implemented is {@link
+ * CountingManyKeysKeyedStateBackend#getKeys(java.lang.String,
java.lang.Object)}
+ */
+ static class CountingManyKeysKeyedStateBackend extends
AbstractKeyedStateBackend<Integer> {
Review Comment:
:+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]