masteryhx commented on code in PR #23239:
URL: https://github.com/apache/flink/pull/23239#discussion_r1311067683
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -46,47 +55,59 @@ public final class MultiStateKeyIterator<K> implements
CloseableIterator<K> {
private final KeyedStateBackend<K> backend;
- private final Iterator<K> internal;
+ private Iterator<? extends StateDescriptor<?, ?>> outerIter;
+ private Iterator<K> innerIter;
private final CloseableRegistry registry;
private K currentKey;
public MultiStateKeyIterator(
List<? extends StateDescriptor<?, ?>> descriptors,
KeyedStateBackend<K> backend) {
+
+ outerIter = descriptors.iterator();
+ innerIter = null;
+
this.descriptors = Preconditions.checkNotNull(descriptors);
this.backend = Preconditions.checkNotNull(backend);
this.registry = new CloseableRegistry();
- this.internal =
- descriptors.stream()
- .map(
- descriptor ->
- backend.getKeys(
- descriptor.getName(),
VoidNamespace.INSTANCE))
- .peek(
- stream -> {
- try {
-
registry.registerCloseable(stream::close);
- } catch (IOException e) {
- throw new RuntimeException(
- "Failed to read keys from
configured StateBackend",
- e);
- }
- })
- .flatMap(stream -> stream)
- .iterator();
}
@Override
public boolean hasNext() {
Review Comment:
How about:
```
@Override
public boolean hasNext() {
while (innerIter == null || !innerIter.hasNext()) {
if (!outerIter.hasNext()) {
return false;
}
StateDescriptor<?, ?> descriptor = outerIter.next();
Stream<K> stream =
backend.getKeys(descriptor.getName(),
VoidNamespace.INSTANCE);
innerIter = stream.iterator();
try {
registry.registerCloseable(stream::close);
} catch (IOException e) {
throw new RuntimeException(
"Failed to read keys from configured StateBackend",
e);
}
}
return true;
}
```
##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java:
##########
@@ -125,4 +226,117 @@ public void testIteratorRemovesFromAllDescriptors()
throws Exception {
.count());
}
}
+
+ /** Test for lazy enumeration of inner iterators. */
+ @Test
+ public void testIteratorPullsSingleKeyFromAllDescriptors() throws
AssertionError {
Review Comment:
IIUC, you want to use this case to verify the key numer you iterator is
correct ?
So you should iterator until it doesn't hasNext, right ?
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -31,13 +31,22 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Stream;
/**
* An iterator for reading all keys in a state backend across multiple
partitioned states.
*
* <p>To read unique keys across all partitioned states callers must invoke
{@link
* MultiStateKeyIterator#remove}.
*
+ * <p>Note: This is a replacement of the original implementation which used
streams with a known
+ * flaw in the {@link Stream#flatMap(java.util.function.Function)}
implementation that lead to
+ * completely enumerating and buffering nested iterators event for a single
call to {@link
+ * MultiStateKeyIterator#hasNext}.
+ *
+ * @see <a
+ *
href="https://bugs.openjdk.org/browse/JDK-8267359">https://bugs.openjdk.org/browse/JDK-8267359</a>
Review Comment:
IMO, this comment about why we update the logic is not necessary, which
could be found in the Jira Ticket.
Or you could just add simple description before outerIter, e.g. : "Avoid
using Stream#flatMap due to xxx, see FLINK-26585 for more details"
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/MultiStateKeyIterator.java:
##########
@@ -46,47 +55,59 @@ public final class MultiStateKeyIterator<K> implements
CloseableIterator<K> {
private final KeyedStateBackend<K> backend;
- private final Iterator<K> internal;
+ private Iterator<? extends StateDescriptor<?, ?>> outerIter;
Review Comment:
Could be "final" ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]