[FLINK-5421] Deduplicate code in StateHandle Iterators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849f701e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849f701e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849f701e Branch: refs/heads/release-1.2 Commit: 849f701ef1ebb149081e530dbca426b88795dd73 Parents: 39fc07f Author: Stefan Richter <[email protected]> Authored: Wed Jan 11 15:36:04 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Jan 12 16:41:33 2017 +0100 ---------------------------------------------------------------------- .../state/StateInitializationContextImpl.java | 93 +++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/849f701e/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index be59a2a..46445d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -136,35 +136,31 @@ public class StateInitializationContextImpl implements StateInitializationContex IOUtils.closeQuietly(closableRegistry); } - private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider> { + private static class KeyGroupStreamIterator + extends AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, KeyGroupsStateHandle> { - private final Iterator<KeyGroupsStateHandle> stateHandleIterator; - private final CloseableRegistry closableRegistry; - - private KeyGroupsStateHandle currentStateHandle; - private FSDataInputStream currentStream; private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator; public KeyGroupStreamIterator( Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) { - this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); - this.closableRegistry = Preconditions.checkNotNull(closableRegistry); + super(stateHandleIterator, closableRegistry); } @Override public boolean hasNext() { + if (null != currentStateHandle && currentOffsetsIterator.hasNext()) { + return true; } + closeCurrentStream(); + while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); if (currentStateHandle.getNumberOfKeyGroups() > 0) { currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; return true; } @@ -173,46 +169,33 @@ public class StateInitializationContextImpl implements StateInitializationContex return false; } - private void openStream() throws IOException { - FSDataInputStream stream = currentStateHandle.openInputStream(); - closableRegistry.registerClosable(stream); - currentStream = stream; - } - @Override public KeyGroupStatePartitionStreamProvider next() { if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); } Tuple2<Integer, Long> keyGroupOffset = currentOffsetsIterator.next(); try { if (null == currentStream) { - openStream(); + openCurrentStream(); } currentStream.seek(keyGroupOffset.f1); + return new KeyGroupStatePartitionStreamProvider(currentStream, keyGroupOffset.f0); } catch (IOException ioex) { + return new KeyGroupStatePartitionStreamProvider(ioex, keyGroupOffset.f0); } } - - @Override - public void remove() { - throw new UnsupportedOperationException("Read only Iterator"); - } } - private static class OperatorStateStreamIterator implements Iterator<StatePartitionStreamProvider> { + private static class OperatorStateStreamIterator + extends AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle> { private final String stateName; //TODO since we only support a single named state in raw, this could be dropped - - private final Iterator<OperatorStateHandle> stateHandleIterator; - private final CloseableRegistry closableRegistry; - - private OperatorStateHandle currentStateHandle; - private FSDataInputStream currentStream; private long[] offsets; private int offPos; @@ -221,18 +204,20 @@ public class StateInitializationContextImpl implements StateInitializationContex Iterator<OperatorStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) { + super(stateHandleIterator, closableRegistry); this.stateName = Preconditions.checkNotNull(stateName); - this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); - this.closableRegistry = Preconditions.checkNotNull(closableRegistry); } @Override public boolean hasNext() { if (null != offsets && offPos < offsets.length) { + return true; } + closeCurrentStream(); + while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName); @@ -241,10 +226,6 @@ public class StateInitializationContextImpl implements StateInitializationContex this.offsets = offsets; this.offPos = 0; - closableRegistry.unregisterClosable(currentStream); - IOUtils.closeQuietly(currentStream); - currentStream = null; - return true; } } @@ -252,16 +233,11 @@ public class StateInitializationContextImpl implements StateInitializationContex return false; } - private void openStream() throws IOException { - FSDataInputStream stream = currentStateHandle.openInputStream(); - closableRegistry.registerClosable(stream); - currentStream = stream; - } - @Override public StatePartitionStreamProvider next() { if (!hasNext()) { + throw new NoSuchElementException("Iterator exhausted"); } @@ -269,15 +245,46 @@ public class StateInitializationContextImpl implements StateInitializationContex try { if (null == currentStream) { - openStream(); + openCurrentStream(); } currentStream.seek(offset); return new StatePartitionStreamProvider(currentStream); } catch (IOException ioex) { + return new StatePartitionStreamProvider(ioex); } } + } + + abstract static class AbstractStateStreamIterator<T extends StatePartitionStreamProvider, H extends StreamStateHandle> + implements Iterator<T> { + + protected final Iterator<H> stateHandleIterator; + protected final CloseableRegistry closableRegistry; + + protected H currentStateHandle; + protected FSDataInputStream currentStream; + + public AbstractStateStreamIterator( + Iterator<H> stateHandleIterator, + CloseableRegistry closableRegistry) { + + this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator); + this.closableRegistry = Preconditions.checkNotNull(closableRegistry); + } + + protected void openCurrentStream() throws IOException { + FSDataInputStream stream = currentStateHandle.openInputStream(); + closableRegistry.registerClosable(stream); + currentStream = stream; + } + + protected void closeCurrentStream() { + closableRegistry.unregisterClosable(currentStream); + IOUtils.closeQuietly(currentStream); + currentStream = null; + } @Override public void remove() {
