This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f2018f30e3c108fd1d2f7b8f34d4a5ca9f49e5d Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jan 15 11:47:33 2025 +0800 [FLINK-37130][State] An util converting future of StateIterator to Iterable --- .../flink/core/state/InternalStateIterator.java | 36 ++++++++++++++++++++ .../apache/flink/core/state/StateFutureUtils.java | 28 ++++++++++++++++ .../asyncprocessing/AbstractStateIterator.java | 29 ++++++++++------- .../state/v2/adaptor/CompleteStateIterator.java | 14 ++++++-- .../asyncprocessing/AbstractStateIteratorTest.java | 38 +++++++++++++++++++++- .../flink/state/forst/ForStListIterator.java | 2 +- .../apache/flink/state/forst/ForStMapIterator.java | 4 +-- 7 files changed, 133 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java new file mode 100644 index 00000000000..e585dbfee9c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateIterator.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateIterator; + +/** + * The Internal definition of {@link InternalStateIterator}, add some method that will be used by + * framework. + */ +@Internal +public interface InternalStateIterator<T> extends StateIterator<T> { + + /** Return whether this iterator has more elements to load. */ + boolean hasNextLoading(); + + /** Return the already loaded elements in cache. */ + Iterable<T> getCurrentCache(); +} diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java index 53756c69892..22d0586b059 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureUtils.java @@ -20,7 +20,9 @@ package org.apache.flink.core.state; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -105,4 +107,30 @@ public class StateFutureUtils { return ret; } } + + /** + * Convert a future of state iterator to a future of iterable. There is no good reason to do so, + * since this may disable the capability of lazy loading. Only useful when the further + * calculation depends on the whole data from the iterator. + */ + public static <T> StateFuture<Iterable<T>> toIterable(StateFuture<StateIterator<T>> future) { + return future.thenCompose( + iterator -> { + if (iterator == null) { + return StateFutureUtils.completedFuture(Collections.emptyList()); + } + InternalStateIterator<T> theIterator = ((InternalStateIterator<T>) iterator); + if (!theIterator.hasNextLoading()) { + return StateFutureUtils.completedFuture(theIterator.getCurrentCache()); + } else { + final ArrayList<T> result = new ArrayList<>(); + return theIterator + .onNext( + next -> { + result.add(next); + }) + .thenApply(ignored -> result); + } + }); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java index afa141ffa82..5748c102501 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.api.common.state.v2.StateIterator; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.core.state.InternalStateIterator; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionWithException; @@ -35,15 +36,15 @@ import java.util.function.Consumer; /** * A {@link StateIterator} implementation to facilitate async data load of iterator. Each state * backend could override this class to maintain more variables in need. Any subclass should - * implement two methods, {@link #hasNext()} and {@link #nextPayloadForContinuousLoading()}. The - * philosophy behind this class is to carry some already loaded elements and provide iterating right - * on the task thread, and load following ones if needed (determined by {@link #hasNext()}) by - * creating **ANOTHER** iterating request. Thus, later it returns another iterator instance, and we - * continue to apply the user iteration on that instance. The whole elements will be iterated by - * recursive call of {@code #onNext()}. + * implement two methods, {@link #hasNextLoading()} and {@link #nextPayloadForContinuousLoading()}. + * The philosophy behind this class is to carry some already loaded elements and provide iterating + * right on the task thread, and load following ones if needed (determined by {@link + * #hasNextLoading()}) by creating **ANOTHER** iterating request. Thus, later it returns another + * iterator instance, and we continue to apply the user iteration on that instance. The whole + * elements will be iterated by recursive call of {@code #onNext()}. */ @SuppressWarnings("rawtypes") -public abstract class AbstractStateIterator<T> implements StateIterator<T> { +public abstract class AbstractStateIterator<T> implements InternalStateIterator<T> { /** The state this iterator iterates on. */ final State originalState; @@ -69,7 +70,7 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { } /** Return whether this iterator has more elements to load besides current cache. */ - protected abstract boolean hasNext(); + public abstract boolean hasNextLoading(); /** * To perform following loading, build and get next payload for the next request. This will put @@ -79,6 +80,10 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { */ protected abstract Object nextPayloadForContinuousLoading(); + public Iterable<T> getCurrentCache() { + return cache == null ? Collections.emptyList() : cache; + } + protected StateRequestType getRequestType() { return requestType; } @@ -118,7 +123,7 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { // Since this is on task thread, we can directly throw the runtime exception. throw new FlinkRuntimeException("Failed to iterate over state.", e); } - if (hasNext()) { + if (hasNextLoading()) { return StateFutureUtils.combineAll(resultFutures) .thenCombine( asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)), @@ -149,7 +154,7 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { // Since this is on task thread, we can directly throw the runtime exception. throw new FlinkRuntimeException("Failed to iterate over state.", e); } - if (hasNext()) { + if (hasNextLoading()) { return asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)); } else { return StateFutureUtils.completedVoidFuture(); @@ -163,13 +168,13 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { for (T item : cache) { iterating.accept(item); } - if (hasNext()) { + if (hasNextLoading()) { ((AbstractStateIterator<T>) syncNextLoad()).onNextSync(iterating); } } @Override public boolean isEmpty() { - return (cache == null || cache.isEmpty()) && !hasNext(); + return (cache == null || cache.isEmpty()) && !hasNextLoading(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java index 034714619e6..613ef515a8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state.v2.adaptor; import org.apache.flink.api.common.state.v2.StateFuture; -import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.core.state.InternalStateIterator; import org.apache.flink.core.state.StateFutureUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.FunctionWithException; @@ -32,7 +32,7 @@ import java.util.Collection; import java.util.Collections; /** A {@link org.apache.flink.api.common.state.v2.StateIterator} that has all elements. */ -public class CompleteStateIterator<T> implements StateIterator<T> { +public class CompleteStateIterator<T> implements InternalStateIterator<T> { final Iterable<T> iterable; final boolean empty; @@ -83,4 +83,14 @@ public class CompleteStateIterator<T> implements StateIterator<T> { public boolean isEmpty() { return empty; } + + @Override + public boolean hasNextLoading() { + return false; + } + + @Override + public Iterable<T> getCurrentCache() { + return iterable; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java index 7e9b4033396..a05e443d3bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java @@ -126,6 +126,42 @@ public class AbstractStateIteratorTest { aec.drainInflightRecords(0); } + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testPartialLoadingWithConversionToIterable() { + TestIteratorStateExecutor stateExecutor = new TestIteratorStateExecutor(100, 3); + AsyncExecutionController aec = + new AsyncExecutionController( + new SyncMailboxExecutor(), + (a, b) -> {}, + stateExecutor, + new DeclarationManager(), + 1, + 100, + 1000, + 1, + null); + stateExecutor.bindAec(aec); + RecordContext<String> recordContext = aec.buildContext("1", "key1"); + aec.setCurrentContext(recordContext); + + AtomicInteger processed = new AtomicInteger(); + + StateFutureUtils.toIterable(aec.handleRequest(null, StateRequestType.MAP_ITER, null)) + .thenAccept( + (iter) -> { + assertThat(iter instanceof Iterable); + ((Iterable<Integer>) iter) + .forEach( + item -> { + assertThat(item) + .isEqualTo(processed.getAndIncrement()); + }); + assertThat(processed.get()).isEqualTo(100); + }); + aec.drainInflightRecords(0); + } + /** * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC * and StateExecutor. @@ -231,7 +267,7 @@ public class AbstractStateIteratorTest { } @Override - protected boolean hasNext() { + public boolean hasNextLoading() { return current < limit; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java index 76377acf471..0eb6df13d64 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStListIterator.java @@ -37,7 +37,7 @@ public class ForStListIterator<V> extends AbstractStateIterator<V> { } @Override - protected boolean hasNext() { + public boolean hasNextLoading() { return false; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java index 5860ae02dae..edf25242324 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMapIterator.java @@ -32,7 +32,7 @@ public class ForStMapIterator<T> extends AbstractStateIterator<T> { /** * Whether the iterator has encountered the end, which determines the return value of {@link - * #hasNext}. + * #hasNextLoading}. */ private boolean encounterEnd; @@ -60,7 +60,7 @@ public class ForStMapIterator<T> extends AbstractStateIterator<T> { } @Override - protected boolean hasNext() { + public boolean hasNextLoading() { return !encounterEnd; }