Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3536#discussion_r113465977
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
@@ -131,59 +169,134 @@ public void dispose() {
@Override
public RunnableFuture<OperatorStateHandle> snapshot(
- long checkpointId,
- long timestamp,
- CheckpointStreamFactory streamFactory,
- CheckpointOptions checkpointOptions) throws Exception {
+ final long checkpointId,
+ final long timestamp,
+ final CheckpointStreamFactory streamFactory,
+ final CheckpointOptions checkpointOptions) throws
Exception {
+
+ final long syncStartTime = System.currentTimeMillis();
if (registeredStates.isEmpty()) {
return DoneFuture.nullValue();
}
- List<OperatorBackendSerializationProxy.StateMetaInfo<?>>
metaInfoList =
- new ArrayList<>(registeredStates.size());
-
- for (Map.Entry<String, PartitionableListState<?>> entry :
registeredStates.entrySet()) {
- PartitionableListState<?> state = entry.getValue();
- OperatorBackendSerializationProxy.StateMetaInfo<?>
metaInfo =
- new
OperatorBackendSerializationProxy.StateMetaInfo<>(
- state.getName(),
-
state.getPartitionStateSerializer(),
-
state.getAssignmentMode());
- metaInfoList.add(metaInfo);
+ final Map<String, PartitionableListState<?>>
registeredStatesDeepCopies =
+ new HashMap<>(registeredStates.size());
+
+ // eagerly create deep copies of the list states in the sync
phase, so that we can use them in the async writing
+ for (Map.Entry<String, PartitionableListState<?>> entry :
this.registeredStates.entrySet()) {
+
+ PartitionableListState<?> listState = entry.getValue();
+ if (null != listState) {
+ listState = listState.deepCopy();
+ }
+ registeredStatesDeepCopies.put(entry.getKey(),
listState);
}
- Map<String, OperatorStateHandle.StateMetaInfo>
writtenStatesMetaData = new HashMap<>(registeredStates.size());
+ // implementation of the async IO operation, based on FutureTask
+ final AbstractAsyncIOCallable<OperatorStateHandle,
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+ new
AbstractAsyncIOCallable<OperatorStateHandle,
CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+ AtomicBoolean open = new
AtomicBoolean(false);
+
+ @Override
+ public
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws
Exception {
+ if (open.compareAndSet(false,
true)) {
+
CheckpointStreamFactory.CheckpointStateOutputStream stream =
+
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+ try {
+
closeStreamOnCancelRegistry.registerClosable(stream);
+ return stream;
+ } catch (Exception ex) {
+ open.set(false);
+ throw ex;
+ }
+ } else {
+ throw new
IOException("Operation already opened.");
+ }
+ }
- CheckpointStreamFactory.CheckpointStateOutputStream out =
streamFactory.
- createCheckpointStateOutputStream(checkpointId,
timestamp);
+ @Override
+ public OperatorStateHandle
performOperation() throws Exception {
+ long asyncStartTime =
System.currentTimeMillis();
- try {
- closeStreamOnCancelRegistry.registerClosable(out);
+ final Map<String,
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
+ new
HashMap<>(registeredStatesDeepCopies.size());
- DataOutputView dov = new
DataOutputViewStreamWrapper(out);
+
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ new
ArrayList<>(registeredStatesDeepCopies.size());
- OperatorBackendSerializationProxy
backendSerializationProxy =
- new
OperatorBackendSerializationProxy(metaInfoList);
+ for (Map.Entry<String,
PartitionableListState<?>> entry :
+
registeredStatesDeepCopies.entrySet()) {
- backendSerializationProxy.write(dov);
+
PartitionableListState<?> state = entry.getValue();
+
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+ new
OperatorBackendSerializationProxy.StateMetaInfo<>(
+
state.getName(),
+
state.getPartitionStateSerializer(),
+
state.getAssignmentMode());
+
metaInfoList.add(metaInfo);
+ }
- dov.writeInt(registeredStates.size());
- for (Map.Entry<String, PartitionableListState<?>> entry
: registeredStates.entrySet()) {
+
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
+ DataOutputView dov = new
DataOutputViewStreamWrapper(out);
- PartitionableListState<?> value =
entry.getValue();
- long[] partitionOffsets = value.write(out);
- OperatorStateHandle.Mode mode =
value.getAssignmentMode();
- writtenStatesMetaData.put(entry.getKey(), new
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
- }
+
OperatorBackendSerializationProxy backendSerializationProxy =
+ new
OperatorBackendSerializationProxy(metaInfoList);
- OperatorStateHandle handle = new
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
+
backendSerializationProxy.write(dov);
- return new DoneFuture<>(handle);
- } finally {
- closeStreamOnCancelRegistry.unregisterClosable(out);
- out.close();
+
dov.writeInt(registeredStatesDeepCopies.size());
+
+ for (Map.Entry<String,
PartitionableListState<?>> entry :
+
registeredStatesDeepCopies.entrySet()) {
+
+
PartitionableListState<?> value = entry.getValue();
+ long[] partitionOffsets
= value.write(out);
+
OperatorStateHandle.Mode mode = value.getAssignmentMode();
+
writtenStatesMetaData.put(
+
entry.getKey(),
+ new
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
+ }
+
+ if (open.compareAndSet(true,
false)) {
+
+ OperatorStateHandle
operatorStateHandle =
+ new
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
+
+ if
(asynchronousSnapshots) {
+
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in
thread {} took {} ms.",
+
streamFactory, Thread.currentThread(), (System.currentTimeMillis() -
asyncStartTime));
+ }
+
+ return
operatorStateHandle;
+ } else {
+ throw new
IOException("Checkpoint stream already closed.");
+ }
+ }
+
+ @Override
+ public void done(boolean canceled) {
+ if (open.compareAndSet(true,
false)) {
+
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
+ if (null != stream) {
+
closeStreamOnCancelRegistry.unregisterClosable(stream);
--- End diff --
Shouldn't we always try to remove the stream from the `ClosableRegistry`
after the callable has been executed?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---