[
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984971#comment-15984971
]
ASF GitHub Bot commented on FLINK-6048:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3536#discussion_r113479735
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
@@ -131,59 +169,134 @@ public void dispose() {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory,
- CheckpointOptions checkpointOptions) throws Exception {
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory,
+ final CheckpointOptions checkpointOptions) throws
Exception {
+
+ final long syncStartTime = System.currentTimeMillis();
if (registeredStates.isEmpty()) {
return DoneFuture.nullValue();
}
- List<OperatorBackendSerializationProxy.StateMetaInfo<?>>
metaInfoList =
- new ArrayList<>(registeredStates.size());
-
- for (Map.Entry<String, PartitionableListState<?>> entry :
registeredStates.entrySet()) {
- PartitionableListState<?> state = entry.getValue();
- OperatorBackendSerializationProxy.StateMetaInfo<?>
metaInfo =
- new
OperatorBackendSerializationProxy.StateMetaInfo<>(
- state.getName(),
-
state.getPartitionStateSerializer(),
-
state.getAssignmentMode());
- metaInfoList.add(metaInfo);
+ final Map<String, PartitionableListState<?>>
registeredStatesDeepCopies =
+ new HashMap<>(registeredStates.size());
+
+ // eagerly create deep copies of the list states in the sync
phase, so that we can use them in the async writing
+ for (Map.Entry<String, PartitionableListState<?>> entry :
this.registeredStates.entrySet()) {
+
+ PartitionableListState<?> listState = entry.getValue();
+ if (null != listState) {
+ listState = listState.deepCopy();
+ }
+ registeredStatesDeepCopies.put(entry.getKey(),
listState);
}
- Map<String, OperatorStateHandle.StateMetaInfo>
writtenStatesMetaData = new HashMap<>(registeredStates.size());
+ // implementation of the async IO operation, based on FutureTask
+ final AbstractAsyncIOCallable<OperatorStateHandle,
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new
AbstractAsyncIOCallable<OperatorStateHandle,
CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ AtomicBoolean open = new
AtomicBoolean(false);
+
+ @Override
+ public
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws
Exception {
+ if (open.compareAndSet(false,
true)) {
+
CheckpointStreamFactory.CheckpointStateOutputStream stream =
+
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ try {
+
closeStreamOnCancelRegistry.registerClosable(stream);
+ return stream;
+ } catch (Exception ex) {
+ open.set(false);
+ throw ex;
+ }
+ } else {
+ throw new
IOException("Operation already opened.");
+ }
+ }
- CheckpointStreamFactory.CheckpointStateOutputStream out =
streamFactory.
- createCheckpointStateOutputStream(checkpointId,
timestamp);
+ @Override
+ public OperatorStateHandle
performOperation() throws Exception {
+ long asyncStartTime =
System.currentTimeMillis();
- try {
- closeStreamOnCancelRegistry.registerClosable(out);
+ final Map<String,
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
+ new
HashMap<>(registeredStatesDeepCopies.size());
- DataOutputView dov = new
DataOutputViewStreamWrapper(out);
+
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ new
ArrayList<>(registeredStatesDeepCopies.size());
- OperatorBackendSerializationProxy
backendSerializationProxy =
- new
OperatorBackendSerializationProxy(metaInfoList);
+ for (Map.Entry<String,
PartitionableListState<?>> entry :
+
registeredStatesDeepCopies.entrySet()) {
- backendSerializationProxy.write(dov);
+
PartitionableListState<?> state = entry.getValue();
+
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+ new
OperatorBackendSerializationProxy.StateMetaInfo<>(
+
state.getName(),
+
state.getPartitionStateSerializer(),
+
state.getAssignmentMode());
+
metaInfoList.add(metaInfo);
+ }
- dov.writeInt(registeredStates.size());
- for (Map.Entry<String, PartitionableListState<?>> entry
: registeredStates.entrySet()) {
+
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
+ DataOutputView dov = new
DataOutputViewStreamWrapper(out);
- PartitionableListState<?> value =
entry.getValue();
- long[] partitionOffsets = value.write(out);
- OperatorStateHandle.Mode mode =
value.getAssignmentMode();
- writtenStatesMetaData.put(entry.getKey(), new
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
- }
+
OperatorBackendSerializationProxy backendSerializationProxy =
+ new
OperatorBackendSerializationProxy(metaInfoList);
- OperatorStateHandle handle = new
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
+
backendSerializationProxy.write(dov);
- return new DoneFuture<>(handle);
- } finally {
- closeStreamOnCancelRegistry.unregisterClosable(out);
- out.close();
+
dov.writeInt(registeredStatesDeepCopies.size());
+
+ for (Map.Entry<String,
PartitionableListState<?>> entry :
+
registeredStatesDeepCopies.entrySet()) {
+
+
PartitionableListState<?> value = entry.getValue();
+ long[] partitionOffsets
= value.write(out);
+
OperatorStateHandle.Mode mode = value.getAssignmentMode();
+
writtenStatesMetaData.put(
+
entry.getKey(),
+ new
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
+ }
+
+ if (open.compareAndSet(true,
false)) {
+
+ OperatorStateHandle
operatorStateHandle =
+ new
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
+
+ if
(asynchronousSnapshots) {
+
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in
thread {} took {} ms.",
+
streamFactory, Thread.currentThread(), (System.currentTimeMillis() -
asyncStartTime));
+ }
+
+ return
operatorStateHandle;
+ } else {
+ throw new
IOException("Checkpoint stream already closed.");
+ }
+ }
+
+ @Override
+ public void done(boolean canceled) {
+ if (open.compareAndSet(true,
false)) {
+
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ if (null != stream) {
+
closeStreamOnCancelRegistry.unregisterClosable(stream);
--- End diff --
I don't understand exactly what you mean. In line 283, the stream is
unregistered. The stream is only created if the runnable is executed in which
case it is guaranteed to reach `done()`. So I think whenever a stream was
created and registered, it is also unregistered and closed. Or do I miss
something here?
> Asynchronous snapshots for heap-based operator state backends
> -------------------------------------------------------------
>
> Key: FLINK-6048
> URL: https://issues.apache.org/jira/browse/FLINK-6048
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for
> asynchronous checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)