This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f2018f30e3c108fd1d2f7b8f34d4a5ca9f49e5d
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jan 15 11:47:33 2025 +0800

    [FLINK-37130][State] An util converting future of StateIterator to Iterable
---
 .../flink/core/state/InternalStateIterator.java    | 36 ++++++++++++++++++++
 .../apache/flink/core/state/StateFutureUtils.java  | 28 ++++++++++++++++
 .../asyncprocessing/AbstractStateIterator.java     | 29 ++++++++++-------
 .../state/v2/adaptor/CompleteStateIterator.java    | 14 ++++++--
 .../asyncprocessing/AbstractStateIteratorTest.java | 38 +++++++++++++++++++++-
 .../flink/state/forst/ForStListIterator.java       |  2 +-
 .../apache/flink/state/forst/ForStMapIterator.java |  4 +--
 7 files changed, 133 insertions(+), 18 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java
 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java
new file mode 100644
index 00000000000..e585dbfee9c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.v2.StateIterator;
+
+/**
+ * The Internal definition of {@link InternalStateIterator}, add some method 
that will be used by
+ * framework.
+ */
+@Internal
+public interface InternalStateIterator<T> extends StateIterator<T> {
+
+    /** Return whether this iterator has more elements to load. */
+    boolean hasNextLoading();
+
+    /** Return the already loaded elements in cache. */
+    Iterable<T> getCurrentCache();
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java
index 53756c69892..22d0586b059 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java
@@ -20,7 +20,9 @@ package org.apache.flink.core.state;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -105,4 +107,30 @@ public class StateFutureUtils {
             return ret;
         }
     }
+
+    /**
+     * Convert a future of state iterator to a future of iterable. There is no 
good reason to do so,
+     * since this may disable the capability of lazy loading. Only useful when 
the further
+     * calculation depends on the whole data from the iterator.
+     */
+    public static <T> StateFuture<Iterable<T>> 
toIterable(StateFuture<StateIterator<T>> future) {
+        return future.thenCompose(
+                iterator -> {
+                    if (iterator == null) {
+                        return 
StateFutureUtils.completedFuture(Collections.emptyList());
+                    }
+                    InternalStateIterator<T> theIterator = 
((InternalStateIterator<T>) iterator);
+                    if (!theIterator.hasNextLoading()) {
+                        return 
StateFutureUtils.completedFuture(theIterator.getCurrentCache());
+                    } else {
+                        final ArrayList<T> result = new ArrayList<>();
+                        return theIterator
+                                .onNext(
+                                        next -> {
+                                            result.add(next);
+                                        })
+                                .thenApply(ignored -> result);
+                    }
+                });
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
index afa141ffa82..5748c102501 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.core.state.InternalStateFuture;
+import org.apache.flink.core.state.InternalStateIterator;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -35,15 +36,15 @@ import java.util.function.Consumer;
 /**
  * A {@link StateIterator} implementation to facilitate async data load of 
iterator. Each state
  * backend could override this class to maintain more variables in need. Any 
subclass should
- * implement two methods, {@link #hasNext()} and {@link 
#nextPayloadForContinuousLoading()}. The
- * philosophy behind this class is to carry some already loaded elements and 
provide iterating right
- * on the task thread, and load following ones if needed (determined by {@link 
#hasNext()}) by
- * creating **ANOTHER** iterating request. Thus, later it returns another 
iterator instance, and we
- * continue to apply the user iteration on that instance. The whole elements 
will be iterated by
- * recursive call of {@code #onNext()}.
+ * implement two methods, {@link #hasNextLoading()} and {@link 
#nextPayloadForContinuousLoading()}.
+ * The philosophy behind this class is to carry some already loaded elements 
and provide iterating
+ * right on the task thread, and load following ones if needed (determined by 
{@link
+ * #hasNextLoading()}) by creating **ANOTHER** iterating request. Thus, later 
it returns another
+ * iterator instance, and we continue to apply the user iteration on that 
instance. The whole
+ * elements will be iterated by recursive call of {@code #onNext()}.
  */
 @SuppressWarnings("rawtypes")
-public abstract class AbstractStateIterator<T> implements StateIterator<T> {
+public abstract class AbstractStateIterator<T> implements 
InternalStateIterator<T> {
 
     /** The state this iterator iterates on. */
     final State originalState;
@@ -69,7 +70,7 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
     }
 
     /** Return whether this iterator has more elements to load besides current 
cache. */
-    protected abstract boolean hasNext();
+    public abstract boolean hasNextLoading();
 
     /**
      * To perform following loading, build and get next payload for the next 
request. This will put
@@ -79,6 +80,10 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
      */
     protected abstract Object nextPayloadForContinuousLoading();
 
+    public Iterable<T> getCurrentCache() {
+        return cache == null ? Collections.emptyList() : cache;
+    }
+
     protected StateRequestType getRequestType() {
         return requestType;
     }
@@ -118,7 +123,7 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
             // Since this is on task thread, we can directly throw the runtime 
exception.
             throw new FlinkRuntimeException("Failed to iterate over state.", 
e);
         }
-        if (hasNext()) {
+        if (hasNextLoading()) {
             return StateFutureUtils.combineAll(resultFutures)
                     .thenCombine(
                             asyncNextLoad().thenCompose(itr -> 
itr.onNext(iterating)),
@@ -149,7 +154,7 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
             // Since this is on task thread, we can directly throw the runtime 
exception.
             throw new FlinkRuntimeException("Failed to iterate over state.", 
e);
         }
-        if (hasNext()) {
+        if (hasNextLoading()) {
             return asyncNextLoad().thenCompose(itr -> itr.onNext(iterating));
         } else {
             return StateFutureUtils.completedVoidFuture();
@@ -163,13 +168,13 @@ public abstract class AbstractStateIterator<T> implements 
StateIterator<T> {
         for (T item : cache) {
             iterating.accept(item);
         }
-        if (hasNext()) {
+        if (hasNextLoading()) {
             ((AbstractStateIterator<T>) syncNextLoad()).onNextSync(iterating);
         }
     }
 
     @Override
     public boolean isEmpty() {
-        return (cache == null || cache.isEmpty()) && !hasNext();
+        return (cache == null || cache.isEmpty()) && !hasNextLoading();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
index 034714619e6..613ef515a8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state.v2.adaptor;
 
 import org.apache.flink.api.common.state.v2.StateFuture;
-import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.core.state.InternalStateIterator;
 import org.apache.flink.core.state.StateFutureUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.FunctionWithException;
@@ -32,7 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 
 /** A {@link org.apache.flink.api.common.state.v2.StateIterator} that has all 
elements. */
-public class CompleteStateIterator<T> implements StateIterator<T> {
+public class CompleteStateIterator<T> implements InternalStateIterator<T> {
 
     final Iterable<T> iterable;
     final boolean empty;
@@ -83,4 +83,14 @@ public class CompleteStateIterator<T> implements 
StateIterator<T> {
     public boolean isEmpty() {
         return empty;
     }
+
+    @Override
+    public boolean hasNextLoading() {
+        return false;
+    }
+
+    @Override
+    public Iterable<T> getCurrentCache() {
+        return iterable;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
index 7e9b4033396..a05e443d3bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
@@ -126,6 +126,42 @@ public class AbstractStateIteratorTest {
         aec.drainInflightRecords(0);
     }
 
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testPartialLoadingWithConversionToIterable() {
+        TestIteratorStateExecutor stateExecutor = new 
TestIteratorStateExecutor(100, 3);
+        AsyncExecutionController aec =
+                new AsyncExecutionController(
+                        new SyncMailboxExecutor(),
+                        (a, b) -> {},
+                        stateExecutor,
+                        new DeclarationManager(),
+                        1,
+                        100,
+                        1000,
+                        1,
+                        null);
+        stateExecutor.bindAec(aec);
+        RecordContext<String> recordContext = aec.buildContext("1", "key1");
+        aec.setCurrentContext(recordContext);
+
+        AtomicInteger processed = new AtomicInteger();
+
+        StateFutureUtils.toIterable(aec.handleRequest(null, 
StateRequestType.MAP_ITER, null))
+                .thenAccept(
+                        (iter) -> {
+                            assertThat(iter instanceof Iterable);
+                            ((Iterable<Integer>) iter)
+                                    .forEach(
+                                            item -> {
+                                                assertThat(item)
+                                                        
.isEqualTo(processed.getAndIncrement());
+                                            });
+                            assertThat(processed.get()).isEqualTo(100);
+                        });
+        aec.drainInflightRecords(0);
+    }
+
     /**
      * A brief implementation of {@link StateExecutor}, to illustrate the 
interaction between AEC
      * and StateExecutor.
@@ -231,7 +267,7 @@ public class AbstractStateIteratorTest {
             }
 
             @Override
-            protected boolean hasNext() {
+            public boolean hasNextLoading() {
                 return current < limit;
             }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java
index 76377acf471..0eb6df13d64 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java
@@ -37,7 +37,7 @@ public class ForStListIterator<V> extends 
AbstractStateIterator<V> {
     }
 
     @Override
-    protected boolean hasNext() {
+    public boolean hasNextLoading() {
         return false;
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java
index 5860ae02dae..edf25242324 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java
@@ -32,7 +32,7 @@ public class ForStMapIterator<T> extends 
AbstractStateIterator<T> {
 
     /**
      * Whether the iterator has encountered the end, which determines the 
return value of {@link
-     * #hasNext}.
+     * #hasNextLoading}.
      */
     private boolean encounterEnd;
 
@@ -60,7 +60,7 @@ public class ForStMapIterator<T> extends 
AbstractStateIterator<T> {
     }
 
     @Override
-    protected boolean hasNext() {
+    public boolean hasNextLoading() {
         return !encounterEnd;
     }
 

Reply via email to