[FLINK-5421] Deduplicate code in StateHandle Iterators

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849f701e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849f701e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849f701e

Branch: refs/heads/release-1.2
Commit: 849f701ef1ebb149081e530dbca426b88795dd73
Parents: 39fc07f
Author: Stefan Richter <[email protected]>
Authored: Wed Jan 11 15:36:04 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Jan 12 16:41:33 2017 +0100

----------------------------------------------------------------------
 .../state/StateInitializationContextImpl.java   | 93 +++++++++++---------
 1 file changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/849f701e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index be59a2a..46445d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -136,35 +136,31 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                IOUtils.closeQuietly(closableRegistry);
        }
 
-       private static class KeyGroupStreamIterator implements 
Iterator<KeyGroupStatePartitionStreamProvider> {
+       private static class KeyGroupStreamIterator
+                       extends 
AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, 
KeyGroupsStateHandle> {
 
-               private final Iterator<KeyGroupsStateHandle> 
stateHandleIterator;
-               private final CloseableRegistry closableRegistry;
-
-               private KeyGroupsStateHandle currentStateHandle;
-               private FSDataInputStream currentStream;
                private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
 
                public KeyGroupStreamIterator(
                                Iterator<KeyGroupsStateHandle> 
stateHandleIterator, CloseableRegistry closableRegistry) {
 
-                       this.stateHandleIterator = 
Preconditions.checkNotNull(stateHandleIterator);
-                       this.closableRegistry = 
Preconditions.checkNotNull(closableRegistry);
+                       super(stateHandleIterator, closableRegistry);
                }
 
                @Override
                public boolean hasNext() {
+
                        if (null != currentStateHandle && 
currentOffsetsIterator.hasNext()) {
+
                                return true;
                        }
 
+                       closeCurrentStream();
+
                        while (stateHandleIterator.hasNext()) {
                                currentStateHandle = stateHandleIterator.next();
                                if (currentStateHandle.getNumberOfKeyGroups() > 
0) {
                                        currentOffsetsIterator = 
currentStateHandle.getGroupRangeOffsets().iterator();
-                                       
closableRegistry.unregisterClosable(currentStream);
-                                       IOUtils.closeQuietly(currentStream);
-                                       currentStream = null;
 
                                        return true;
                                }
@@ -173,46 +169,33 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                        return false;
                }
 
-               private void openStream() throws IOException {
-                       FSDataInputStream stream = 
currentStateHandle.openInputStream();
-                       closableRegistry.registerClosable(stream);
-                       currentStream = stream;
-               }
-
                @Override
                public KeyGroupStatePartitionStreamProvider next() {
 
                        if (!hasNext()) {
+
                                throw new NoSuchElementException("Iterator 
exhausted");
                        }
 
                        Tuple2<Integer, Long> keyGroupOffset = 
currentOffsetsIterator.next();
                        try {
                                if (null == currentStream) {
-                                       openStream();
+                                       openCurrentStream();
                                }
                                currentStream.seek(keyGroupOffset.f1);
+
                                return new 
KeyGroupStatePartitionStreamProvider(currentStream, keyGroupOffset.f0);
                        } catch (IOException ioex) {
+
                                return new 
KeyGroupStatePartitionStreamProvider(ioex, keyGroupOffset.f0);
                        }
                }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException("Read only 
Iterator");
-               }
        }
 
-       private static class OperatorStateStreamIterator implements 
Iterator<StatePartitionStreamProvider> {
+       private static class OperatorStateStreamIterator
+                       extends 
AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle> {
 
                private final String stateName; //TODO since we only support a 
single named state in raw, this could be dropped
-
-               private final Iterator<OperatorStateHandle> stateHandleIterator;
-               private final CloseableRegistry closableRegistry;
-
-               private OperatorStateHandle currentStateHandle;
-               private FSDataInputStream currentStream;
                private long[] offsets;
                private int offPos;
 
@@ -221,18 +204,20 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                                Iterator<OperatorStateHandle> 
stateHandleIterator,
                                CloseableRegistry closableRegistry) {
 
+                       super(stateHandleIterator, closableRegistry);
                        this.stateName = Preconditions.checkNotNull(stateName);
-                       this.stateHandleIterator = 
Preconditions.checkNotNull(stateHandleIterator);
-                       this.closableRegistry = 
Preconditions.checkNotNull(closableRegistry);
                }
 
                @Override
                public boolean hasNext() {
 
                        if (null != offsets && offPos < offsets.length) {
+
                                return true;
                        }
 
+                       closeCurrentStream();
+
                        while (stateHandleIterator.hasNext()) {
                                currentStateHandle = stateHandleIterator.next();
                                long[] offsets = 
currentStateHandle.getStateNameToPartitionOffsets().get(stateName);
@@ -241,10 +226,6 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                                        this.offsets = offsets;
                                        this.offPos = 0;
 
-                                       
closableRegistry.unregisterClosable(currentStream);
-                                       IOUtils.closeQuietly(currentStream);
-                                       currentStream = null;
-
                                        return true;
                                }
                        }
@@ -252,16 +233,11 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                        return false;
                }
 
-               private void openStream() throws IOException {
-                       FSDataInputStream stream = 
currentStateHandle.openInputStream();
-                       closableRegistry.registerClosable(stream);
-                       currentStream = stream;
-               }
-
                @Override
                public StatePartitionStreamProvider next() {
 
                        if (!hasNext()) {
+
                                throw new NoSuchElementException("Iterator 
exhausted");
                        }
 
@@ -269,15 +245,46 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
 
                        try {
                                if (null == currentStream) {
-                                       openStream();
+                                       openCurrentStream();
                                }
                                currentStream.seek(offset);
 
                                return new 
StatePartitionStreamProvider(currentStream);
                        } catch (IOException ioex) {
+
                                return new StatePartitionStreamProvider(ioex);
                        }
                }
+       }
+
+       abstract static class AbstractStateStreamIterator<T extends 
StatePartitionStreamProvider, H extends StreamStateHandle>
+                       implements Iterator<T> {
+
+               protected final Iterator<H> stateHandleIterator;
+               protected final CloseableRegistry closableRegistry;
+
+               protected H currentStateHandle;
+               protected FSDataInputStream currentStream;
+
+               public AbstractStateStreamIterator(
+                               Iterator<H> stateHandleIterator,
+                               CloseableRegistry closableRegistry) {
+
+                       this.stateHandleIterator = 
Preconditions.checkNotNull(stateHandleIterator);
+                       this.closableRegistry = 
Preconditions.checkNotNull(closableRegistry);
+               }
+
+               protected void openCurrentStream() throws IOException {
+                       FSDataInputStream stream = 
currentStateHandle.openInputStream();
+                       closableRegistry.registerClosable(stream);
+                       currentStream = stream;
+               }
+
+               protected void closeCurrentStream() {
+                       closableRegistry.unregisterClosable(currentStream);
+                       IOUtils.closeQuietly(currentStream);
+                       currentStream = null;
+               }
 
                @Override
                public void remove() {

Reply via email to