Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3536#discussion_r113465977
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
    @@ -131,59 +169,134 @@ public void dispose() {
     
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
    -                   long checkpointId,
    -                   long timestamp,
    -                   CheckpointStreamFactory streamFactory,
    -                   CheckpointOptions checkpointOptions) throws Exception {
    +                   final long checkpointId,
    +                   final long timestamp,
    +                   final CheckpointStreamFactory streamFactory,
    +                   final CheckpointOptions checkpointOptions) throws 
Exception {
    +
    +           final long syncStartTime = System.currentTimeMillis();
     
                if (registeredStates.isEmpty()) {
                        return DoneFuture.nullValue();
                }
     
    -           List<OperatorBackendSerializationProxy.StateMetaInfo<?>> 
metaInfoList =
    -                           new ArrayList<>(registeredStates.size());
    -
    -           for (Map.Entry<String, PartitionableListState<?>> entry : 
registeredStates.entrySet()) {
    -                   PartitionableListState<?> state = entry.getValue();
    -                   OperatorBackendSerializationProxy.StateMetaInfo<?> 
metaInfo =
    -                                   new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
    -                                                   state.getName(),
    -                                                   
state.getPartitionStateSerializer(),
    -                                                   
state.getAssignmentMode());
    -                   metaInfoList.add(metaInfo);
    +           final Map<String, PartitionableListState<?>> 
registeredStatesDeepCopies =
    +                           new HashMap<>(registeredStates.size());
    +
    +           // eagerly create deep copies of the list states in the sync 
phase, so that we can use them in the async writing
    +           for (Map.Entry<String, PartitionableListState<?>> entry : 
this.registeredStates.entrySet()) {
    +
    +                   PartitionableListState<?> listState = entry.getValue();
    +                   if (null != listState) {
    +                           listState = listState.deepCopy();
    +                   }
    +                   registeredStatesDeepCopies.put(entry.getKey(), 
listState);
                }
     
    -           Map<String, OperatorStateHandle.StateMetaInfo> 
writtenStatesMetaData = new HashMap<>(registeredStates.size());
    +           // implementation of the async IO operation, based on FutureTask
    +           final AbstractAsyncIOCallable<OperatorStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
    +                           new 
AbstractAsyncIOCallable<OperatorStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
    +
    +                                   AtomicBoolean open = new 
AtomicBoolean(false);
    +
    +                                   @Override
    +                                   public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
    +                                           if (open.compareAndSet(false, 
true)) {
    +                                                   
CheckpointStreamFactory.CheckpointStateOutputStream stream =
    +                                                                   
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
    +                                                   try {
    +                                                           
closeStreamOnCancelRegistry.registerClosable(stream);
    +                                                           return stream;
    +                                                   } catch (Exception ex) {
    +                                                           open.set(false);
    +                                                           throw ex;
    +                                                   }
    +                                           } else {
    +                                                   throw new 
IOException("Operation already opened.");
    +                                           }
    +                                   }
     
    -           CheckpointStreamFactory.CheckpointStateOutputStream out = 
streamFactory.
    -                           createCheckpointStateOutputStream(checkpointId, 
timestamp);
    +                                   @Override
    +                                   public OperatorStateHandle 
performOperation() throws Exception {
    +                                           long asyncStartTime = 
System.currentTimeMillis();
     
    -           try {
    -                   closeStreamOnCancelRegistry.registerClosable(out);
    +                                           final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
    +                                                           new 
HashMap<>(registeredStatesDeepCopies.size());
     
    -                   DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
    +                                           
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
    +                                                           new 
ArrayList<>(registeredStatesDeepCopies.size());
     
    -                   OperatorBackendSerializationProxy 
backendSerializationProxy =
    -                                   new 
OperatorBackendSerializationProxy(metaInfoList);
    +                                           for (Map.Entry<String, 
PartitionableListState<?>> entry :
    +                                                           
registeredStatesDeepCopies.entrySet()) {
     
    -                   backendSerializationProxy.write(dov);
    +                                                   
PartitionableListState<?> state = entry.getValue();
    +                                                   
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
    +                                                                   new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
    +                                                                           
        state.getName(),
    +                                                                           
        state.getPartitionStateSerializer(),
    +                                                                           
        state.getAssignmentMode());
    +                                                   
metaInfoList.add(metaInfo);
    +                                           }
     
    -                   dov.writeInt(registeredStates.size());
    -                   for (Map.Entry<String, PartitionableListState<?>> entry 
: registeredStates.entrySet()) {
    +                                           
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
    +                                           DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
     
    -                           PartitionableListState<?> value = 
entry.getValue();
    -                           long[] partitionOffsets = value.write(out);
    -                           OperatorStateHandle.Mode mode = 
value.getAssignmentMode();
    -                           writtenStatesMetaData.put(entry.getKey(), new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
    -                   }
    +                                           
OperatorBackendSerializationProxy backendSerializationProxy =
    +                                                           new 
OperatorBackendSerializationProxy(metaInfoList);
     
    -                   OperatorStateHandle handle = new 
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
    +                                           
backendSerializationProxy.write(dov);
     
    -                   return new DoneFuture<>(handle);
    -           } finally {
    -                   closeStreamOnCancelRegistry.unregisterClosable(out);
    -                   out.close();
    +                                           
dov.writeInt(registeredStatesDeepCopies.size());
    +
    +                                           for (Map.Entry<String, 
PartitionableListState<?>> entry :
    +                                                           
registeredStatesDeepCopies.entrySet()) {
    +
    +                                                   
PartitionableListState<?> value = entry.getValue();
    +                                                   long[] partitionOffsets 
= value.write(out);
    +                                                   
OperatorStateHandle.Mode mode = value.getAssignmentMode();
    +                                                   
writtenStatesMetaData.put(
    +                                                                   
entry.getKey(),
    +                                                                   new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
    +                                           }
    +
    +                                           if (open.compareAndSet(true, 
false)) {
    +
    +                                                   OperatorStateHandle 
operatorStateHandle =
    +                                                                   new 
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
    +
    +                                                   if 
(asynchronousSnapshots) {
    +                                                           
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in 
thread {} took {} ms.",
    +                                                                           
streamFactory, Thread.currentThread(), (System.currentTimeMillis() - 
asyncStartTime));
    +                                                   }
    +
    +                                                   return 
operatorStateHandle;
    +                                           } else {
    +                                                   throw new 
IOException("Checkpoint stream already closed.");
    +                                           }
    +                                   }
    +
    +                                   @Override
    +                                   public void done(boolean canceled) {
    +                                           if (open.compareAndSet(true, 
false)) {
    +                                                   
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
    +                                                   if (null != stream) {
    +                                                           
closeStreamOnCancelRegistry.unregisterClosable(stream);
    --- End diff --
    
    Shouldn't we always try to remove the stream from the `ClosableRegistry` 
after the callable has been executed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to