This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c225a8c [FLINK-23367][state] Ensure InternalPriorityQueue#iterator could be closed for changelog state-backend c225a8c is described below commit c225a8c3757ed96417e721ae013165d51f463ec0 Author: Yun Tang <myas...@live.com> AuthorDate: Fri Jul 16 12:32:07 2021 +0800 [FLINK-23367][state] Ensure InternalPriorityQueue#iterator could be closed for changelog state-backend --- .../ChangelogKeyGroupedPriorityQueue.java | 5 ++- .../flink/state/changelog/ChangelogMapState.java | 37 ++++++++++++---------- .../changelog/StateChangeLoggingIterator.java | 18 +++++++---- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java index eff82db..2379f64 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java @@ -129,9 +129,8 @@ public class ChangelogKeyGroupedPriorityQueue<T> @Override @Nonnull public CloseableIterator<T> iterator() { - return CloseableIterator.adapterForIterator( - StateChangeLoggingIterator.create( - delegatedPriorityQueue.iterator(), logger, serializer::serialize, null)); + return StateChangeLoggingIterator.create( + delegatedPriorityQueue.iterator(), logger, serializer::serialize, null); } @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java index 3b35dcd..219839f 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; import org.apache.flink.state.changelog.restore.StateChangeApplier; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingConsumer; @@ -122,22 +123,24 @@ class ChangelogMapState<K, N, UK, UV> private Iterator<Map.Entry<UK, UV>> getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) { final N currentNamespace = getCurrentNamespace(); return StateChangeLoggingIterator.create( - new Iterator<Map.Entry<UK, UV>>() { - @Override - public Map.Entry<UK, UV> next() { - return loggingMapEntry(iterator.next(), changeLogger, currentNamespace); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public void remove() { - iterator.remove(); - } - }, + CloseableIterator.adapterForIterator( + new Iterator<Map.Entry<UK, UV>>() { + @Override + public Map.Entry<UK, UV> next() { + return loggingMapEntry( + iterator.next(), changeLogger, currentNamespace); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public void remove() { + iterator.remove(); + } + }), changeLogger, (entry, out) -> serializeKey(entry.getKey(), out), currentNamespace); @@ -148,7 +151,7 @@ class ChangelogMapState<K, N, UK, UV> Iterable<UK> iterable = delegatedState.keys(); return () -> StateChangeLoggingIterator.create( - iterable.iterator(), + CloseableIterator.adapterForIterator(iterable.iterator()), changeLogger, this::serializeKey, getCurrentNamespace()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java index 38ea943..3484e4a 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java @@ -18,6 +18,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.BiConsumerWithException; @@ -25,11 +26,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Iterator; -class StateChangeLoggingIterator<State, StateElement, Namespace> implements Iterator<StateElement> { +class StateChangeLoggingIterator<State, StateElement, Namespace> + implements CloseableIterator<StateElement> { - private final Iterator<StateElement> iterator; + private final CloseableIterator<StateElement> iterator; private final StateChangeLogger<State, Namespace> changeLogger; private final BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> removalWriter; @@ -37,7 +38,7 @@ class StateChangeLoggingIterator<State, StateElement, Namespace> implements Iter @Nullable private StateElement lastReturned; private StateChangeLoggingIterator( - Iterator<StateElement> iterator, + CloseableIterator<StateElement> iterator, StateChangeLogger<State, Namespace> changeLogger, BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> removalWriter, @@ -69,12 +70,17 @@ class StateChangeLoggingIterator<State, StateElement, Namespace> implements Iter } @Nonnull - public static <Namespace, State, StateElement> Iterator<StateElement> create( - Iterator<StateElement> iterator, + public static <Namespace, State, StateElement> CloseableIterator<StateElement> create( + CloseableIterator<StateElement> iterator, StateChangeLogger<State, Namespace> changeLogger, BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException> removalWriter, Namespace ns) { return new StateChangeLoggingIterator<>(iterator, changeLogger, removalWriter, ns); } + + @Override + public void close() throws Exception { + iterator.close(); + } }