This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c225a8c  [FLINK-23367][state] Ensure InternalPriorityQueue#iterator 
could be closed for changelog state-backend
c225a8c is described below

commit c225a8c3757ed96417e721ae013165d51f463ec0
Author: Yun Tang <myas...@live.com>
AuthorDate: Fri Jul 16 12:32:07 2021 +0800

    [FLINK-23367][state] Ensure InternalPriorityQueue#iterator could be closed 
for changelog state-backend
---
 .../ChangelogKeyGroupedPriorityQueue.java          |  5 ++-
 .../flink/state/changelog/ChangelogMapState.java   | 37 ++++++++++++----------
 .../changelog/StateChangeLoggingIterator.java      | 18 +++++++----
 3 files changed, 34 insertions(+), 26 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
index eff82db..2379f64 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
@@ -129,9 +129,8 @@ public class ChangelogKeyGroupedPriorityQueue<T>
     @Override
     @Nonnull
     public CloseableIterator<T> iterator() {
-        return CloseableIterator.adapterForIterator(
-                StateChangeLoggingIterator.create(
-                        delegatedPriorityQueue.iterator(), logger, 
serializer::serialize, null));
+        return StateChangeLoggingIterator.create(
+                delegatedPriorityQueue.iterator(), logger, 
serializer::serialize, null);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
index 3b35dcd..219839f 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
 import org.apache.flink.state.changelog.restore.StateChangeApplier;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.ThrowingConsumer;
 
@@ -122,22 +123,24 @@ class ChangelogMapState<K, N, UK, UV>
     private Iterator<Map.Entry<UK, UV>> 
getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) {
         final N currentNamespace = getCurrentNamespace();
         return StateChangeLoggingIterator.create(
-                new Iterator<Map.Entry<UK, UV>>() {
-                    @Override
-                    public Map.Entry<UK, UV> next() {
-                        return loggingMapEntry(iterator.next(), changeLogger, 
currentNamespace);
-                    }
-
-                    @Override
-                    public boolean hasNext() {
-                        return iterator.hasNext();
-                    }
-
-                    @Override
-                    public void remove() {
-                        iterator.remove();
-                    }
-                },
+                CloseableIterator.adapterForIterator(
+                        new Iterator<Map.Entry<UK, UV>>() {
+                            @Override
+                            public Map.Entry<UK, UV> next() {
+                                return loggingMapEntry(
+                                        iterator.next(), changeLogger, 
currentNamespace);
+                            }
+
+                            @Override
+                            public boolean hasNext() {
+                                return iterator.hasNext();
+                            }
+
+                            @Override
+                            public void remove() {
+                                iterator.remove();
+                            }
+                        }),
                 changeLogger,
                 (entry, out) -> serializeKey(entry.getKey(), out),
                 currentNamespace);
@@ -148,7 +151,7 @@ class ChangelogMapState<K, N, UK, UV>
         Iterable<UK> iterable = delegatedState.keys();
         return () ->
                 StateChangeLoggingIterator.create(
-                        iterable.iterator(),
+                        
CloseableIterator.adapterForIterator(iterable.iterator()),
                         changeLogger,
                         this::serializeKey,
                         getCurrentNamespace());
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
index 38ea943..3484e4a 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java
@@ -18,6 +18,7 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -25,11 +26,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Iterator;
 
-class StateChangeLoggingIterator<State, StateElement, Namespace> implements 
Iterator<StateElement> {
+class StateChangeLoggingIterator<State, StateElement, Namespace>
+        implements CloseableIterator<StateElement> {
 
-    private final Iterator<StateElement> iterator;
+    private final CloseableIterator<StateElement> iterator;
     private final StateChangeLogger<State, Namespace> changeLogger;
     private final BiConsumerWithException<StateElement, 
DataOutputViewStreamWrapper, IOException>
             removalWriter;
@@ -37,7 +38,7 @@ class StateChangeLoggingIterator<State, StateElement, 
Namespace> implements Iter
     @Nullable private StateElement lastReturned;
 
     private StateChangeLoggingIterator(
-            Iterator<StateElement> iterator,
+            CloseableIterator<StateElement> iterator,
             StateChangeLogger<State, Namespace> changeLogger,
             BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
                     removalWriter,
@@ -69,12 +70,17 @@ class StateChangeLoggingIterator<State, StateElement, 
Namespace> implements Iter
     }
 
     @Nonnull
-    public static <Namespace, State, StateElement> Iterator<StateElement> 
create(
-            Iterator<StateElement> iterator,
+    public static <Namespace, State, StateElement> 
CloseableIterator<StateElement> create(
+            CloseableIterator<StateElement> iterator,
             StateChangeLogger<State, Namespace> changeLogger,
             BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
                     removalWriter,
             Namespace ns) {
         return new StateChangeLoggingIterator<>(iterator, changeLogger, 
removalWriter, ns);
     }
+
+    @Override
+    public void close() throws Exception {
+        iterator.close();
+    }
 }

Reply via email to