http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index f5e3618..7e4eded 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint; import com.google.common.collect.Iterables; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.Preconditions; @@ -47,27 +49,36 @@ public class TaskState implements StateObject { /** handles to non-partitioned states, subtaskindex -> subtaskstate */ private final Map<Integer, SubtaskState> subtaskStates; - /** handles to partitioned states, subtaskindex -> keyed state */ + /** handles to partitionable states, subtaskindex -> partitionable state */ + private final Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStates; + + /** handles to key-partitioned states, subtaskindex -> keyed state */ private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles; + /** parallelism of the operator when it was checkpointed */ private final int parallelism; /** maximum parallelism of the operator when the job was first created */ private final int maxParallelism; - public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) { + private final int chainLength; + + public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism, int chainLength) { Preconditions.checkArgument( parallelism <= maxParallelism, "Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + "."); + Preconditions.checkArgument(chainLength > 0, "There has to be at least one operator in the operator chain."); this.jobVertexID = jobVertexID; this.subtaskStates = new HashMap<>(parallelism); + this.partitionableStates = new HashMap<>(parallelism); this.keyGroupsStateHandles = new HashMap<>(parallelism); this.parallelism = parallelism; this.maxParallelism = maxParallelism; + this.chainLength = chainLength; } public JobVertexID getJobVertexID() { @@ -85,6 +96,20 @@ public class TaskState implements StateObject { } } + public void putPartitionableState( + int subtaskIndex, + ChainedStateHandle<OperatorStateHandle> partitionableState) { + + Preconditions.checkNotNull(partitionableState); + + if (subtaskIndex < 0 || subtaskIndex >= parallelism) { + throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex + + " exceeds the maximum number of sub tasks " + subtaskStates.size()); + } else { + partitionableStates.put(subtaskIndex, partitionableState); + } + } + public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle keyGroupsStateHandle) { Preconditions.checkNotNull(keyGroupsStateHandle); @@ -106,6 +131,15 @@ public class TaskState implements StateObject { } } + public ChainedStateHandle<OperatorStateHandle> getPartitionableState(int subtaskIndex) { + if (subtaskIndex < 0 || subtaskIndex >= parallelism) { + throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex + + " exceeds the maximum number of sub tasks " + subtaskStates.size()); + } else { + return partitionableStates.get(subtaskIndex); + } + } + public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) { if (subtaskIndex < 0 || subtaskIndex >= parallelism) { throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex + @@ -131,6 +165,10 @@ public class TaskState implements StateObject { return maxParallelism; } + public int getChainLength() { + return chainLength; + } + public Collection<KeyGroupsStateHandle> getKeyGroupStates() { return keyGroupsStateHandles.values(); } @@ -147,7 +185,7 @@ public class TaskState implements StateObject { @Override public void discardState() throws Exception { StateUtil.bestEffortDiscardAllStateObjects( - Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values())); + Iterables.concat(subtaskStates.values(), partitionableStates.values(), keyGroupsStateHandles.values())); } @@ -156,11 +194,19 @@ public class TaskState implements StateObject { long result = 0L; for (int i = 0; i < parallelism; i++) { - if (subtaskStates.get(i) != null) { - result += subtaskStates.get(i).getStateSize(); + SubtaskState subtaskState = subtaskStates.get(i); + if (subtaskState != null) { + result += subtaskState.getStateSize(); + } + + ChainedStateHandle<OperatorStateHandle> partitionableState = partitionableStates.get(i); + if (partitionableState != null) { + result += partitionableState.getStateSize(); } - if (keyGroupsStateHandles.get(i) != null) { - result += keyGroupsStateHandles.get(i).getStateSize(); + + KeyGroupsStateHandle keyGroupsState = keyGroupsStateHandles.get(i); + if (keyGroupsState != null) { + result += keyGroupsState.getStateSize(); } } @@ -172,8 +218,11 @@ public class TaskState implements StateObject { if (obj instanceof TaskState) { TaskState other = (TaskState) obj; - return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism && - subtaskStates.equals(other.subtaskStates) && keyGroupsStateHandles.equals(other.keyGroupsStateHandles); + return jobVertexID.equals(other.jobVertexID) + && parallelism == other.parallelism + && subtaskStates.equals(other.subtaskStates) + && partitionableStates.equals(other.partitionableStates) + && keyGroupsStateHandles.equals(other.keyGroupsStateHandles); } else { return false; } @@ -181,13 +230,7 @@ public class TaskState implements StateObject { @Override public int hashCode() { - return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, keyGroupsStateHandles); - } - - @Override - public void close() throws IOException { - StateUtil.bestEffortCloseAllStateObjects( - Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values())); + return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, partitionableStates, keyGroupsStateHandles); } public Map<Integer, SubtaskState> getSubtaskStates() { @@ -197,4 +240,8 @@ public class TaskState implements StateObject { public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() { return Collections.unmodifiableMap(keyGroupsStateHandles); } + + public Map<Integer, ChainedStateHandle<OperatorStateHandle>> getPartitionableStates() { + return partitionableStates; + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index f07f44f..536062a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -35,6 +36,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { private static final byte BYTE_STREAM_STATE_HANDLE = 1; private static final byte FILE_STREAM_STATE_HANDLE = 2; private static final byte KEY_GROUPS_HANDLE = 3; + private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer(); @@ -75,8 +78,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { int parallelism = taskState.getParallelism(); dos.writeInt(parallelism); dos.writeInt(taskState.getMaxParallelism()); + dos.writeInt(taskState.getChainLength()); - // Sub task states + // Sub task non-partitionable states Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates(); dos.writeInt(subtaskStateMap.size()); for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) { @@ -93,7 +97,22 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { dos.writeLong(subtaskState.getDuration()); } + // Sub task partitionable states + Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStatesMap = taskState.getPartitionableStates(); + dos.writeInt(partitionableStatesMap.size()); + for (Map.Entry<Integer, ChainedStateHandle<OperatorStateHandle>> entry : partitionableStatesMap.entrySet()) { + dos.writeInt(entry.getKey()); + + ChainedStateHandle<OperatorStateHandle> chainedStateHandle = entry.getValue(); + dos.writeInt(chainedStateHandle.getLength()); + for (int j = 0; j < chainedStateHandle.getLength(); ++j) { + OperatorStateHandle stateHandle = chainedStateHandle.get(j); + serializePartitionableStateHandle(stateHandle, dos); + } + } + + // Keyed state Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles = taskState.getKeyGroupsStateHandles(); dos.writeInt(keyGroupsStateHandles.size()); for (Map.Entry<Integer, KeyGroupsStateHandle> entry : keyGroupsStateHandles.entrySet()) { @@ -119,9 +138,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong()); int parallelism = dis.readInt(); int maxParallelism = dis.readInt(); + int chainLength = dis.readInt(); // Add task state - TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism); + TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength); taskStates.add(taskState); // Sub task states @@ -142,6 +162,24 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { taskState.putState(subtaskIndex, subtaskState); } + int numPartitionableOpStates = dis.readInt(); + + for (int j = 0; j < numPartitionableOpStates; j++) { + int subtaskIndex = dis.readInt(); + int chainedStateHandleSize = dis.readInt(); + List<OperatorStateHandle> streamStateHandleList = new ArrayList<>(chainedStateHandleSize); + + for (int k = 0; k < chainedStateHandleSize; ++k) { + OperatorStateHandle streamStateHandle = deserializePartitionableStateHandle(dis); + streamStateHandleList.add(streamStateHandle); + } + + ChainedStateHandle<OperatorStateHandle> chainedStateHandle = + new ChainedStateHandle<>(streamStateHandleList); + + taskState.putPartitionableState(subtaskIndex, chainedStateHandle); + } + // Key group states int numKeyGroupStates = dis.readInt(); for (int j = 0; j < numKeyGroupStates; j++) { @@ -157,7 +195,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { return new SavepointV1(checkpointId, taskStates); } - public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException { + public static void serializeKeyGroupStateHandle( + KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException { + if (stateHandle != null) { dos.writeByte(KEY_GROUPS_HANDLE); dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); @@ -172,10 +212,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { } public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException { - int type = dis.readByte(); + final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; - } else { + } else if (KEY_GROUPS_HANDLE == type) { int startKeyGroup = dis.readInt(); int numKeyGroups = dis.readInt(); KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); @@ -186,6 +226,53 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> { KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } else { + throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type); + } + } + + public static void serializePartitionableStateHandle( + OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle != null) { + dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE); + Map<String, long[]> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets(); + dos.writeInt(partitionOffsetsMap.size()); + for (Map.Entry<String, long[]> entry : partitionOffsetsMap.entrySet()) { + dos.writeUTF(entry.getKey()); + long[] offsets = entry.getValue(); + dos.writeInt(offsets.length); + for (int i = 0; i < offsets.length; ++i) { + dos.writeLong(offsets[i]); + } + } + serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos); + } else { + dos.writeByte(NULL_HANDLE); + } + } + + public static OperatorStateHandle deserializePartitionableStateHandle( + DataInputStream dis) throws IOException { + + final int type = dis.readByte(); + if (NULL_HANDLE == type) { + return null; + } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) { + int mapSize = dis.readInt(); + Map<String, long[]> offsetsMap = new HashMap<>(mapSize); + for (int i = 0; i < mapSize; ++i) { + String key = dis.readUTF(); + long[] offsets = new long[dis.readInt()]; + for (int j = 0; j < offsets.length; ++j) { + offsets[j] = dis.readLong(); + } + offsetsMap.put(key, offsets); + } + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); + return new OperatorStateHandle(stateHandle, offsetsMap); + } else { + throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type); } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index ca976e4..7bbdb2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.SerializedValue; @@ -100,6 +101,8 @@ public final class TaskDeploymentDescriptor implements Serializable { /** Handle to the key-grouped state of the head operator in the chain */ private final List<KeyGroupsStateHandle> keyGroupState; + private final List<Collection<OperatorStateHandle>> partitionableOperatorState; + /** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */ private final SerializedValue<ExecutionConfig> serializedExecutionConfig; @@ -107,26 +110,27 @@ public final class TaskDeploymentDescriptor implements Serializable { * Constructs a task deployment descriptor. */ public TaskDeploymentDescriptor( - JobID jobID, - String jobName, - JobVertexID vertexID, - ExecutionAttemptID executionId, - SerializedValue<ExecutionConfig> serializedExecutionConfig, - String taskName, - int numberOfKeyGroups, - int indexInSubtaskGroup, - int numberOfSubtasks, - int attemptNumber, - Configuration jobConfiguration, - Configuration taskConfiguration, - String invokableClassName, - List<ResultPartitionDeploymentDescriptor> producedPartitions, - List<InputGateDeploymentDescriptor> inputGates, - List<BlobKey> requiredJarFiles, - List<URL> requiredClasspaths, - int targetSlotNumber, - ChainedStateHandle<StreamStateHandle> operatorState, - List<KeyGroupsStateHandle> keyGroupState) { + JobID jobID, + String jobName, + JobVertexID vertexID, + ExecutionAttemptID executionId, + SerializedValue<ExecutionConfig> serializedExecutionConfig, + String taskName, + int numberOfKeyGroups, + int indexInSubtaskGroup, + int numberOfSubtasks, + int attemptNumber, + Configuration jobConfiguration, + Configuration taskConfiguration, + String invokableClassName, + List<ResultPartitionDeploymentDescriptor> producedPartitions, + List<InputGateDeploymentDescriptor> inputGates, + List<BlobKey> requiredJarFiles, + List<URL> requiredClasspaths, + int targetSlotNumber, + ChainedStateHandle<StreamStateHandle> operatorState, + List<KeyGroupsStateHandle> keyGroupState, + List<Collection<OperatorStateHandle>> partitionableOperatorStateHandles) { checkArgument(indexInSubtaskGroup >= 0); checkArgument(numberOfSubtasks > indexInSubtaskGroup); @@ -153,6 +157,7 @@ public final class TaskDeploymentDescriptor implements Serializable { this.targetSlotNumber = targetSlotNumber; this.operatorState = operatorState; this.keyGroupState = keyGroupState; + this.partitionableOperatorState = partitionableOperatorStateHandles; } public TaskDeploymentDescriptor( @@ -195,6 +200,7 @@ public final class TaskDeploymentDescriptor implements Serializable { requiredClasspaths, targetSlotNumber, null, + null, null); } @@ -347,4 +353,8 @@ public final class TaskDeploymentDescriptor implements Serializable { public List<KeyGroupsStateHandle> getKeyGroupState() { return keyGroupState; } + + public List<Collection<OperatorStateHandle>> getPartitionableOperatorState() { + return partitionableOperatorState; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 273c0d9..f6cde95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -34,13 +34,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -187,12 +184,8 @@ public interface Environment { * the checkpoint with the give checkpoint-ID. This method does include * the given state in the checkpoint. * - * @param checkpointId - * The ID of the checkpoint. - * @param chainedStateHandle - * Handle for the chained operator state - * @param keyGroupStateHandles - * Handles for key group state + * @param checkpointId The ID of the checkpoint. + * @param checkpointStateHandles All state handles for the checkpointed state * @param synchronousDurationMillis * The duration (in milliseconds) of the synchronous part of the operator checkpoint * @param asynchronousDurationMillis @@ -204,8 +197,7 @@ public interface Environment { */ void acknowledgeCheckpoint( long checkpointId, - ChainedStateHandle<StreamStateHandle> chainedStateHandle, - List<KeyGroupsStateHandle> keyGroupStateHandles, + CheckpointStateHandles checkpointStateHandles, long synchronousDurationMillis, long asynchronousDurationMillis, long bytesBufferedInAlignment, http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 912ff10..b92e3af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; @@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -56,6 +58,7 @@ import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -134,6 +137,8 @@ public class Execution { private ChainedStateHandle<StreamStateHandle> chainedStateHandle; + private List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle; + private List<KeyGroupsStateHandle> keyGroupsStateHandles; @@ -223,6 +228,10 @@ public class Execution { return keyGroupsStateHandles; } + public List<Collection<OperatorStateHandle>> getChainedPartitionableStateHandle() { + return chainedPartitionableStateHandle; + } + public boolean isFinished() { return state.isTerminal(); } @@ -246,18 +255,19 @@ public class Execution { * Sets the initial state for the execution. The serialized state is then shipped via the * {@link TaskDeploymentDescriptor} to the TaskManagers. * - * @param chainedStateHandle Chained operator state - * @param keyGroupsStateHandles Key-group state (= partitioned state) + * @param checkpointStateHandles all checkpointed operator state */ - public void setInitialState( - ChainedStateHandle<StreamStateHandle> chainedStateHandle, - List<KeyGroupsStateHandle> keyGroupsStateHandles) { + public void setInitialState(CheckpointStateHandles checkpointStateHandles, List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle) { if (state != ExecutionState.CREATED) { throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); } - this.chainedStateHandle = chainedStateHandle; - this.keyGroupsStateHandles = keyGroupsStateHandles; + + if(checkpointStateHandles != null) { + this.chainedStateHandle = checkpointStateHandles.getNonPartitionedStateHandles(); + this.chainedPartitionableStateHandle = chainedPartitionableStateHandle; + this.keyGroupsStateHandles = checkpointStateHandles.getKeyGroupsStateHandle(); + } } // -------------------------------------------------------------------------------------------- @@ -385,6 +395,7 @@ public class Execution { slot, chainedStateHandle, keyGroupsStateHandles, + chainedPartitionableStateHandle, attemptNumber); // register this execution at the execution graph, to receive call backs http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7c3fa0b..6023205 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -56,10 +56,8 @@ import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a8d5ee4..4837803 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; @@ -53,6 +54,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; @@ -629,6 +631,7 @@ public class ExecutionVertex { SimpleSlot targetSlot, ChainedStateHandle<StreamStateHandle> operatorState, List<KeyGroupsStateHandle> keyGroupStates, + List<Collection<OperatorStateHandle>> partitionableOperatorStateHandle, int attemptNumber) { // Produced intermediate results @@ -681,7 +684,8 @@ public class ExecutionVertex { classpaths, targetSlot.getRoot().getSlotNumber(), operatorState, - keyGroupStates); + keyGroupStates, + partitionableOperatorStateHandle); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java index 9ddfdf7..55e3e09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java @@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; +import java.util.Collection; import java.util.List; /** @@ -33,11 +35,16 @@ public interface StatefulTask { /** * Sets the initial state of the operator, upon recovery. The initial state is typically * a snapshot of the state from a previous execution. - * + * + * TODO this should use @{@link org.apache.flink.runtime.state.CheckpointStateHandles} after redoing chained state. + * * @param chainedState Handle for the chained operator states. * @param keyGroupsState Handle for key group states. */ - void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception; + void setInitialState( + ChainedStateHandle<StreamStateHandle> chainedState, + List<KeyGroupsStateHandle> keyGroupsState, + List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception; /** * This method is called to trigger a checkpoint, asynchronously by the checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java index 72396eb..e95e7b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java @@ -20,11 +20,7 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; - -import java.util.List; +import org.apache.flink.runtime.state.CheckpointStateHandles; import static org.apache.flink.util.Preconditions.checkArgument; @@ -32,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an * individual task is completed. - * + * <p> * <p>This message may carry the handle to the task's chained operator state and the key group * state. */ @@ -40,9 +36,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements private static final long serialVersionUID = -7606214777192401493L; - private final ChainedStateHandle<StreamStateHandle> stateHandle; - private final List<KeyGroupsStateHandle> keyGroupsStateHandle; + private final CheckpointStateHandles checkpointStateHandles; /** The duration (in milliseconds) that the synchronous part of the checkpoint took */ private final long synchronousDurationMillis; @@ -62,24 +57,22 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) { - this(job, taskExecutionId, checkpointId, null, null); + this(job, taskExecutionId, checkpointId, null); } public AcknowledgeCheckpoint( JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, - ChainedStateHandle<StreamStateHandle> state, - List<KeyGroupsStateHandle> keyGroupStateAndSizes) { - this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L); + CheckpointStateHandles checkpointStateHandles) { + this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L); } public AcknowledgeCheckpoint( JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, - ChainedStateHandle<StreamStateHandle> state, - List<KeyGroupsStateHandle> keyGroupStateAndSizes, + CheckpointStateHandles checkpointStateHandles, long synchronousDurationMillis, long asynchronousDurationMillis, long bytesBufferedInAlignment, @@ -87,9 +80,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements super(job, taskExecutionId, checkpointId); - // these may be null in cases where the operator has no state - this.stateHandle = state; - this.keyGroupsStateHandle = keyGroupStateAndSizes; + this.checkpointStateHandles = checkpointStateHandles; // these may be "-1", in case the values are unknown or not set checkArgument(synchronousDurationMillis >= -1); @@ -107,12 +98,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements // properties // ------------------------------------------------------------------------ - public ChainedStateHandle<StreamStateHandle> getStateHandle() { - return stateHandle; - } - - public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() { - return keyGroupsStateHandle; + public CheckpointStateHandles getCheckpointStateHandles() { + return checkpointStateHandles; } public long getSynchronousDurationMillis() { @@ -134,31 +121,33 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements // -------------------------------------------------------------------------------------------- @Override - public int hashCode() { - return super.hashCode(); - } - - @Override public boolean equals(Object o) { if (this == o) { - return true ; + return true; } - else if (o instanceof AcknowledgeCheckpoint) { - AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o; - return super.equals(o) && - (this.stateHandle == null ? that.stateHandle == null : - (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) && - (this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : - (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle))); + if (!(o instanceof AcknowledgeCheckpoint)) { + return false; } - else { + if (!super.equals(o)) { return false; } + + AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o; + return checkpointStateHandles != null ? + checkpointStateHandles.equals(that.checkpointStateHandles) : that.checkpointStateHandles == null; + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (checkpointStateHandles != null ? checkpointStateHandles.hashCode() : 0); + return result; } @Override public String toString() { - return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s keyGroupState=%s", - getCheckpointId(), getJob(), getTaskExecutionId(), stateHandle, keyGroupsStateHandle); + return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s", + getCheckpointId(), getJob(), getTaskExecutionId(), checkpointStateHandles); } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java deleted file mode 100644 index 5966c95..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -/** - * A simple base for closable handles. - * - * Offers to register a stream (or other closable object) that close calls are delegated to if - * the handle is closed or was already closed. - */ -public abstract class AbstractCloseableHandle implements Closeable, StateObject { - - /** Serial Version UID must be constant to maintain format compatibility */ - private static final long serialVersionUID = 1L; - - /** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */ - private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER = - AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed"); - - // ------------------------------------------------------------------------ - - /** The closeable to close if this handle is closed late */ - private transient volatile Closeable toClose; - - /** Flag to remember if this handle was already closed */ - @SuppressWarnings("unused") // this field is actually updated, but via the "CLOSER" updater - private transient volatile int isClosed; - - // ------------------------------------------------------------------------ - - protected final void registerCloseable(Closeable toClose) throws IOException { - if (toClose == null) { - return; - } - - // NOTE: The order of operations matters here: - // (1) first setting the closeable - // (2) checking the flag. - // Because the order in the {@link #close()} method is the opposite, and - // both variables are volatile (reordering barriers), we can be sure that - // one of the methods always notices the effect of a concurrent call to the - // other method. - - this.toClose = toClose; - - // check if we were closed early - if (this.isClosed != 0) { - toClose.close(); - throw new IOException("handle is closed"); - } - } - - /** - * Closes the handle. - * - * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)}, - * then this will be closes. - * - * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future, - * it will immediately be closed and that method will throw an exception. - * - * @throws IOException Exceptions occurring while closing an already registered {@code Closeable} - * are forwarded. - * - * @see #registerCloseable(Closeable) - */ - @Override - public final void close() throws IOException { - // NOTE: The order of operations matters here: - // (1) first setting the closed flag - // (2) checking whether there is already a closeable - // Because the order in the {@link #registerCloseable(Closeable)} method is the opposite, and - // both variables are volatile (reordering barriers), we can be sure that - // one of the methods always notices the effect of a concurrent call to the - // other method. - - if (CLOSER.compareAndSet(this, 0, 1)) { - final Closeable toClose = this.toClose; - if (toClose != null) { - this.toClose = null; - toClose.close(); - } - } - } - - /** - * Checks whether this handle has been closed. - * - * @return True is the handle is closed, false otherwise. - */ - public boolean isClosed() { - return isClosed != 0; - } - - /** - * This method checks whether the handle is closed and throws an exception if it is closed. - * If the handle is not closed, this method does nothing. - * - * @throws IOException Thrown, if the handle has been closed. - */ - public void ensureNotClosed() throws IOException { - if (isClosed != 0) { - throw new IOException("handle is closed"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java new file mode 100644 index 0000000..7ca3b38 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateBackend; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.util.Preconditions; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * Base implementation of KeyedStateBackend. The state can be checkpointed + * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}. + * + * @param <K> Type of the key by which state is keyed. + */ +public abstract class AbstractKeyedStateBackend<K> + implements KeyedStateBackend<K>, SnapshotProvider<KeyGroupsStateHandle>, Closeable { + + /** {@link TypeSerializer} for our key. */ + protected final TypeSerializer<K> keySerializer; + + /** The currently active key. */ + protected K currentKey; + + /** The key group of the currently active key */ + private int currentKeyGroup; + + /** So that we can give out state when the user uses the same key. */ + protected HashMap<String, KvState<?>> keyValueStatesByName; + + /** For caching the last accessed partitioned state */ + private String lastName; + + @SuppressWarnings("rawtypes") + private KvState lastState; + + /** The number of key-groups aka max parallelism */ + protected final int numberOfKeyGroups; + + /** Range of key-groups for which this backend is responsible */ + protected final KeyGroupRange keyGroupRange; + + /** KvStateRegistry helper for this task */ + protected final TaskKvStateRegistry kvStateRegistry; + + /** Registry for all opened streams, so they can be closed if the task using this backend is closed */ + protected ClosableRegistry cancelStreamRegistry; + + protected final ClassLoader userCodeClassLoader; + + public AbstractKeyedStateBackend( + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange) { + + this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); + this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups); + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); + this.cancelStreamRegistry = new ClosableRegistry(); + } + + /** + * Closes the state backend, releasing all internal resources, but does not delete any persistent + * checkpoint data. + * + */ + @Override + public void dispose() { + if (kvStateRegistry != null) { + kvStateRegistry.unregisterAll(); + } + + lastName = null; + lastState = null; + keyValueStatesByName = null; + } + + /** + * Creates and returns a new {@link ValueState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the value that the {@code ValueState} can store. + */ + protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ListState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the values that the {@code ListState} can store. + */ + protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ReducingState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> The type of the values that the {@code ListState} can store. + */ + protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link FoldingState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <T> Type of the values folded into the state + * @param <ACC> Type of the value in the state * + */ + protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; + + /** + * @see KeyedStateBackend + */ + @Override + public void setCurrentKey(K newKey) { + this.currentKey = newKey; + this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups); + } + + /** + * @see KeyedStateBackend + */ + @Override + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + /** + * @see KeyedStateBackend + */ + @Override + public K getCurrentKey() { + return currentKey; + } + + /** + * @see KeyedStateBackend + */ + @Override + public int getCurrentKeyGroupIndex() { + return currentKeyGroup; + } + + /** + * @see KeyedStateBackend + */ + @Override + public int getNumberOfKeyGroups() { + return numberOfKeyGroups; + } + + /** + * @see KeyedStateBackend + */ + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + /** + * @see KeyedStateBackend + */ + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception { + Preconditions.checkNotNull(namespace, "Namespace"); + Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer"); + + if (keySerializer == null) { + throw new RuntimeException("State key serializer has not been configured in the config. " + + "This operation cannot use partitioned state."); + } + + if (!stateDescriptor.isSerializerInitialized()) { + stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + if (keyValueStatesByName == null) { + keyValueStatesByName = new HashMap<>(); + } + + if (lastName != null && lastName.equals(stateDescriptor.getName())) { + lastState.setCurrentNamespace(namespace); + return (S) lastState; + } + + KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName()); + if (previous != null) { + lastState = previous; + lastState.setCurrentNamespace(namespace); + lastName = stateDescriptor.getName(); + return (S) previous; + } + + // create a new blank key/value state + S state = stateDescriptor.bind(new StateBackend() { + @Override + public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception { + return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc); + } + + @Override + public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception { + return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc); + } + + @Override + public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception { + return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc); + } + + @Override + public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc); + } + + }); + + KvState kvState = (KvState) state; + + keyValueStatesByName.put(stateDescriptor.getName(), kvState); + + lastName = stateDescriptor.getName(); + lastState = kvState; + + kvState.setCurrentNamespace(namespace); + + // Publish queryable state + if (stateDescriptor.isQueryable()) { + if (kvStateRegistry == null) { + throw new IllegalStateException("State backend has not been initialized for job."); + } + + String name = stateDescriptor.getQueryableStateName(); + kvStateRegistry.registerKvState(keyGroupRange, name, kvState); + } + + return state; + } + + @Override + @SuppressWarnings("unchecked,rawtypes") + public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception { + if (stateDescriptor instanceof ReducingStateDescriptor) { + ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor; + ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction(); + ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor); + KvState kvState = (KvState) state; + Object result = null; + for (N source: sources) { + kvState.setCurrentNamespace(source); + Object sourceValue = state.get(); + if (result == null) { + result = state.get(); + } else if (sourceValue != null) { + result = reduceFn.reduce(result, sourceValue); + } + state.clear(); + } + kvState.setCurrentNamespace(target); + if (result != null) { + state.add(result); + } + } else if (stateDescriptor instanceof ListStateDescriptor) { + ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor); + KvState kvState = (KvState) state; + List<Object> result = new ArrayList<>(); + for (N source: sources) { + kvState.setCurrentNamespace(source); + Iterable<Object> sourceValue = state.get(); + if (sourceValue != null) { + for (Object o : sourceValue) { + result.add(o); + } + } + state.clear(); + } + kvState.setCurrentNamespace(target); + for (Object o : result) { + state.add(o); + } + } else { + throw new RuntimeException("Cannot merge states for " + stateDescriptor); + } + } + + @Override + public void close() throws IOException { + cancelStreamRegistry.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 0d2bf45..c2e665b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import java.io.IOException; +import java.util.Collection; import java.util.List; /** @@ -36,31 +37,33 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * Creates a {@link CheckpointStreamFactory} that can be used to create streams * that should end up in a checkpoint. * - * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams. + * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams. * @param operatorIdentifier An identifier of the operator for which we create streams. */ public abstract CheckpointStreamFactory createStreamFactory( JobID jobId, - String operatorIdentifier) throws IOException; + String operatorIdentifier + ) throws IOException; /** - * Creates a new {@link KeyedStateBackend} that is responsible for keeping keyed state + * Creates a new {@link AbstractKeyedStateBackend} that is responsible for keeping keyed state * and can be checkpointed to checkpoint streams. */ - public abstract <K> KeyedStateBackend<K> createKeyedStateBackend( + public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry + ) throws Exception; /** - * Creates a new {@link KeyedStateBackend} that restores its state from the given list + * Creates a new {@link AbstractKeyedStateBackend} that restores its state from the given list * {@link KeyGroupsStateHandle KeyGroupStateHandles}. */ - public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend( + public abstract <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, @@ -68,6 +71,30 @@ public abstract class AbstractStateBackend implements java.io.Serializable { int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoredState, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry + ) throws Exception; + + /** + * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator + * state in checkpoint streams. + */ + public OperatorStateBackend createOperatorStateBackend( + Environment env, + String operatorIdentifier + ) throws Exception { + return new DefaultOperatorStateBackend(); + } + + /** + * Creates a new {@link OperatorStateBackend} that restores its state from the given collection of + * {@link OperatorStateHandle}. + */ + public OperatorStateBackend restoreOperatorStateBackend( + Environment env, + String operatorIdentifier, + Collection<OperatorStateHandle> restoreSnapshots + ) throws Exception { + return new DefaultOperatorStateBackend(restoreSnapshots); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java index 74057ee..c6904c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java @@ -26,7 +26,7 @@ import java.util.Collections; import java.util.List; /** - * Handle to the non-partitioned states for the operators in an operator chain. + * Handle to state handles for the operators in an operator chain. */ public class ChainedStateHandle<T extends StateObject> implements StateObject { @@ -123,9 +123,4 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject { public static <T extends StateObject> ChainedStateHandle<T> wrapSingleHandle(T stateHandleToWrap) { return new ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap)); } - - @Override - public void close() throws IOException { - StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java new file mode 100644 index 0000000..9daf963 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.Serializable; +import java.util.List; + +/** + * Container state handles that contains all state handles from the different state types of a checkpointed state. + * TODO This will be changed in the future if we get rid of chained state and instead connect state directly to individual operators in a chain. + */ +public class CheckpointStateHandles implements Serializable { + + private static final long serialVersionUID = 3252351989995L; + + private final ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles; + + private final ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles; + + private final List<KeyGroupsStateHandle> keyGroupsStateHandle; + + public CheckpointStateHandles( + ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles, + ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles, + List<KeyGroupsStateHandle> keyGroupsStateHandle) { + + this.nonPartitionedStateHandles = nonPartitionedStateHandles; + this.partitioneableStateHandles = partitioneableStateHandles; + this.keyGroupsStateHandle = keyGroupsStateHandle; + } + + public ChainedStateHandle<StreamStateHandle> getNonPartitionedStateHandles() { + return nonPartitionedStateHandles; + } + + public ChainedStateHandle<OperatorStateHandle> getPartitioneableStateHandles() { + return partitioneableStateHandles; + } + + public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() { + return keyGroupsStateHandle; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CheckpointStateHandles)) { + return false; + } + + CheckpointStateHandles that = (CheckpointStateHandles) o; + + if (nonPartitionedStateHandles != null ? + !nonPartitionedStateHandles.equals(that.nonPartitionedStateHandles) + : that.nonPartitionedStateHandles != null) { + return false; + } + + if (partitioneableStateHandles != null ? + !partitioneableStateHandles.equals(that.partitioneableStateHandles) + : that.partitioneableStateHandles != null) { + return false; + } + return keyGroupsStateHandle != null ? + keyGroupsStateHandle.equals(that.keyGroupsStateHandle) : that.keyGroupsStateHandle == null; + + } + + @Override + public int hashCode() { + int result = nonPartitionedStateHandles != null ? nonPartitionedStateHandles.hashCode() : 0; + result = 31 * result + (partitioneableStateHandles != null ? partitioneableStateHandles.hashCode() : 0); + result = 31 * result + (keyGroupsStateHandle != null ? keyGroupsStateHandle.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "CheckpointStateHandles{" + + "nonPartitionedStateHandles=" + nonPartitionedStateHandles + + ", partitioneableStateHandles=" + partitioneableStateHandles + + ", keyGroupsStateHandle=" + keyGroupsStateHandle + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java new file mode 100644 index 0000000..26d6192 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.commons.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +public class ClosableRegistry implements Closeable { + + private final Set<Closeable> registeredCloseables; + private boolean closed; + + public ClosableRegistry() { + this.registeredCloseables = new HashSet<>(); + this.closed = false; + } + + public boolean registerClosable(Closeable closeable) { + + if (null == closeable) { + return false; + } + + synchronized (getSynchronizationLock()) { + if (closed) { + throw new IllegalStateException("Cannot register Closable, registry is already closed."); + } + + return registeredCloseables.add(closeable); + } + } + + public boolean unregisterClosable(Closeable closeable) { + + if (null == closeable) { + return false; + } + + synchronized (getSynchronizationLock()) { + return registeredCloseables.remove(closeable); + } + } + + @Override + public void close() throws IOException { + + if (!registeredCloseables.isEmpty()) { + + synchronized (getSynchronizationLock()) { + + for (Closeable closeable : registeredCloseables) { + IOUtils.closeQuietly(closeable); + } + + registeredCloseables.clear(); + closed = true; + } + } + } + + private Object getSynchronizationLock() { + return registeredCloseables; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java new file mode 100644 index 0000000..0bd5eeb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.RunnableFuture; + +/** + * Default implementation of OperatorStateStore that provides the ability to make snapshots. + */ +public class DefaultOperatorStateBackend implements OperatorStateBackend { + + private final Map<String, PartitionableListState<?>> registeredStates; + private final Collection<OperatorStateHandle> restoreSnapshots; + private final ClosableRegistry closeStreamOnCancelRegistry; + + /** + * Restores a OperatorStateStore (lazily) using the provided snapshots. + * + * @param restoreSnapshots snapshots that are available to restore partitionable states on request. + */ + public DefaultOperatorStateBackend( + Collection<OperatorStateHandle> restoreSnapshots) { + this.restoreSnapshots = restoreSnapshots; + this.registeredStates = new HashMap<>(); + this.closeStreamOnCancelRegistry = new ClosableRegistry(); + } + + /** + * Creates an empty OperatorStateStore. + */ + public DefaultOperatorStateBackend() { + this(null); + } + + /** + * @see OperatorStateStore + */ + @Override + public <S> ListState<S> getPartitionableState( + ListStateDescriptor<S> stateDescriptor) throws IOException { + + Preconditions.checkNotNull(stateDescriptor); + + String name = Preconditions.checkNotNull(stateDescriptor.getName()); + TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer()); + + @SuppressWarnings("unchecked") + PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name); + + if (null == partitionableListState) { + + partitionableListState = new PartitionableListState<>(partitionStateSerializer); + + registeredStates.put(name, partitionableListState); + + // Try to restore previous state if state handles to snapshots are provided + if (restoreSnapshots != null) { + for (OperatorStateHandle stateHandle : restoreSnapshots) { + + long[] offsets = stateHandle.getStateNameToPartitionOffsets().get(name); + + if (offsets != null) { + + FSDataInputStream in = stateHandle.openInputStream(); + try { + closeStreamOnCancelRegistry.registerClosable(in); + + DataInputView div = new DataInputViewStreamWrapper(in); + + for (int i = 0; i < offsets.length; ++i) { + + in.seek(offsets[i]); + S partitionState = partitionStateSerializer.deserialize(div); + partitionableListState.add(partitionState); + } + } finally { + closeStreamOnCancelRegistry.unregisterClosable(in); + in.close(); + } + } + } + } + } + + return partitionableListState; + } + + /** + * @see SnapshotProvider + */ + @Override + public RunnableFuture<OperatorStateHandle> snapshot( + long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception { + + if (registeredStates.isEmpty()) { + return new DoneFuture<>(null); + } + + Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size()); + + CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory. + createCheckpointStateOutputStream(checkpointId, timestamp); + + try { + closeStreamOnCancelRegistry.registerClosable(out); + + DataOutputView dov = new DataOutputViewStreamWrapper(out); + + dov.writeInt(registeredStates.size()); + for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { + + long[] partitionOffsets = entry.getValue().write(out); + writtenStatesMetaData.put(entry.getKey(), partitionOffsets); + } + + OperatorStateHandle handle = new OperatorStateHandle(out.closeAndGetHandle(), writtenStatesMetaData); + + return new DoneFuture<>(handle); + } finally { + closeStreamOnCancelRegistry.unregisterClosable(out); + out.close(); + } + } + + @Override + public void dispose() { + + } + + static final class PartitionableListState<S> implements ListState<S> { + + private final List<S> listState; + private final TypeSerializer<S> partitionStateSerializer; + + public PartitionableListState(TypeSerializer<S> partitionStateSerializer) { + this.listState = new ArrayList<>(); + this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer); + } + + @Override + public void clear() { + listState.clear(); + } + + @Override + public Iterable<S> get() { + return listState; + } + + @Override + public void add(S value) { + listState.add(value); + } + + public long[] write(FSDataOutputStream out) throws IOException { + + long[] partitionOffsets = new long[listState.size()]; + + DataOutputView dov = new DataOutputViewStreamWrapper(out); + + for (int i = 0; i < listState.size(); ++i) { + S element = listState.get(i); + partitionOffsets[i] = out.getPos(); + partitionStateSerializer.serialize(element, dov); + } + + return partitionOffsets; + } + } + + @Override + public Set<String> getRegisteredStateNames() { + return registeredStates.keySet(); + } + + @Override + public void close() throws IOException { + closeStreamOnCancelRegistry.close(); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java index 4f0a82b..8e7207e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java @@ -31,6 +31,8 @@ import java.util.Iterator; */ public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , Serializable { + private static final long serialVersionUID = 6595415219136429696L; + /** the range of key-groups */ private final KeyGroupRange keyGroupRange; http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 7f87e86..ea12808 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -138,7 +138,6 @@ public class KeyGroupsStateHandle implements StateObject { return false; } return stateHandle.equals(that.stateHandle); - } @Override @@ -155,9 +154,4 @@ public class KeyGroupsStateHandle implements StateObject { ", data=" + stateHandle + '}'; } - - @Override - public void close() throws IOException { - stateHandle.close(); - } }
