[ 
https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15984971#comment-15984971
 ] 

ASF GitHub Bot commented on FLINK-6048:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3536#discussion_r113479735
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
    @@ -131,59 +169,134 @@ public void dispose() {
     
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
    -                   long checkpointId,
    -                   long timestamp,
    -                   CheckpointStreamFactory streamFactory,
    -                   CheckpointOptions checkpointOptions) throws Exception {
    +                   final long checkpointId,
    +                   final long timestamp,
    +                   final CheckpointStreamFactory streamFactory,
    +                   final CheckpointOptions checkpointOptions) throws 
Exception {
    +
    +           final long syncStartTime = System.currentTimeMillis();
     
                if (registeredStates.isEmpty()) {
                        return DoneFuture.nullValue();
                }
     
    -           List<OperatorBackendSerializationProxy.StateMetaInfo<?>> 
metaInfoList =
    -                           new ArrayList<>(registeredStates.size());
    -
    -           for (Map.Entry<String, PartitionableListState<?>> entry : 
registeredStates.entrySet()) {
    -                   PartitionableListState<?> state = entry.getValue();
    -                   OperatorBackendSerializationProxy.StateMetaInfo<?> 
metaInfo =
    -                                   new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
    -                                                   state.getName(),
    -                                                   
state.getPartitionStateSerializer(),
    -                                                   
state.getAssignmentMode());
    -                   metaInfoList.add(metaInfo);
    +           final Map<String, PartitionableListState<?>> 
registeredStatesDeepCopies =
    +                           new HashMap<>(registeredStates.size());
    +
    +           // eagerly create deep copies of the list states in the sync 
phase, so that we can use them in the async writing
    +           for (Map.Entry<String, PartitionableListState<?>> entry : 
this.registeredStates.entrySet()) {
    +
    +                   PartitionableListState<?> listState = entry.getValue();
    +                   if (null != listState) {
    +                           listState = listState.deepCopy();
    +                   }
    +                   registeredStatesDeepCopies.put(entry.getKey(), 
listState);
                }
     
    -           Map<String, OperatorStateHandle.StateMetaInfo> 
writtenStatesMetaData = new HashMap<>(registeredStates.size());
    +           // implementation of the async IO operation, based on FutureTask
    +           final AbstractAsyncIOCallable<OperatorStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
    +                           new 
AbstractAsyncIOCallable<OperatorStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
    +
    +                                   AtomicBoolean open = new 
AtomicBoolean(false);
    +
    +                                   @Override
    +                                   public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
    +                                           if (open.compareAndSet(false, 
true)) {
    +                                                   
CheckpointStreamFactory.CheckpointStateOutputStream stream =
    +                                                                   
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
    +                                                   try {
    +                                                           
closeStreamOnCancelRegistry.registerClosable(stream);
    +                                                           return stream;
    +                                                   } catch (Exception ex) {
    +                                                           open.set(false);
    +                                                           throw ex;
    +                                                   }
    +                                           } else {
    +                                                   throw new 
IOException("Operation already opened.");
    +                                           }
    +                                   }
     
    -           CheckpointStreamFactory.CheckpointStateOutputStream out = 
streamFactory.
    -                           createCheckpointStateOutputStream(checkpointId, 
timestamp);
    +                                   @Override
    +                                   public OperatorStateHandle 
performOperation() throws Exception {
    +                                           long asyncStartTime = 
System.currentTimeMillis();
     
    -           try {
    -                   closeStreamOnCancelRegistry.registerClosable(out);
    +                                           final Map<String, 
OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
    +                                                           new 
HashMap<>(registeredStatesDeepCopies.size());
     
    -                   DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
    +                                           
List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
    +                                                           new 
ArrayList<>(registeredStatesDeepCopies.size());
     
    -                   OperatorBackendSerializationProxy 
backendSerializationProxy =
    -                                   new 
OperatorBackendSerializationProxy(metaInfoList);
    +                                           for (Map.Entry<String, 
PartitionableListState<?>> entry :
    +                                                           
registeredStatesDeepCopies.entrySet()) {
     
    -                   backendSerializationProxy.write(dov);
    +                                                   
PartitionableListState<?> state = entry.getValue();
    +                                                   
OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
    +                                                                   new 
OperatorBackendSerializationProxy.StateMetaInfo<>(
    +                                                                           
        state.getName(),
    +                                                                           
        state.getPartitionStateSerializer(),
    +                                                                           
        state.getAssignmentMode());
    +                                                   
metaInfoList.add(metaInfo);
    +                                           }
     
    -                   dov.writeInt(registeredStates.size());
    -                   for (Map.Entry<String, PartitionableListState<?>> entry 
: registeredStates.entrySet()) {
    +                                           
CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle();
    +                                           DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
     
    -                           PartitionableListState<?> value = 
entry.getValue();
    -                           long[] partitionOffsets = value.write(out);
    -                           OperatorStateHandle.Mode mode = 
value.getAssignmentMode();
    -                           writtenStatesMetaData.put(entry.getKey(), new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
    -                   }
    +                                           
OperatorBackendSerializationProxy backendSerializationProxy =
    +                                                           new 
OperatorBackendSerializationProxy(metaInfoList);
     
    -                   OperatorStateHandle handle = new 
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
    +                                           
backendSerializationProxy.write(dov);
     
    -                   return new DoneFuture<>(handle);
    -           } finally {
    -                   closeStreamOnCancelRegistry.unregisterClosable(out);
    -                   out.close();
    +                                           
dov.writeInt(registeredStatesDeepCopies.size());
    +
    +                                           for (Map.Entry<String, 
PartitionableListState<?>> entry :
    +                                                           
registeredStatesDeepCopies.entrySet()) {
    +
    +                                                   
PartitionableListState<?> value = entry.getValue();
    +                                                   long[] partitionOffsets 
= value.write(out);
    +                                                   
OperatorStateHandle.Mode mode = value.getAssignmentMode();
    +                                                   
writtenStatesMetaData.put(
    +                                                                   
entry.getKey(),
    +                                                                   new 
OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
    +                                           }
    +
    +                                           if (open.compareAndSet(true, 
false)) {
    +
    +                                                   OperatorStateHandle 
operatorStateHandle =
    +                                                                   new 
OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
    +
    +                                                   if 
(asynchronousSnapshots) {
    +                                                           
LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in 
thread {} took {} ms.",
    +                                                                           
streamFactory, Thread.currentThread(), (System.currentTimeMillis() - 
asyncStartTime));
    +                                                   }
    +
    +                                                   return 
operatorStateHandle;
    +                                           } else {
    +                                                   throw new 
IOException("Checkpoint stream already closed.");
    +                                           }
    +                                   }
    +
    +                                   @Override
    +                                   public void done(boolean canceled) {
    +                                           if (open.compareAndSet(true, 
false)) {
    +                                                   
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
    +                                                   if (null != stream) {
    +                                                           
closeStreamOnCancelRegistry.unregisterClosable(stream);
    --- End diff --
    
    I don't understand exactly what you mean. In line 283, the stream is 
unregistered. The stream is only created if the runnable is executed in which 
case it is guaranteed to reach `done()`. So I think whenever a stream was 
created and registered, it is also unregistered and closed. Or do I miss 
something here?


> Asynchronous snapshots for heap-based operator state backends
> -------------------------------------------------------------
>
>                 Key: FLINK-6048
>                 URL: https://issues.apache.org/jira/browse/FLINK-6048
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>             Fix For: 1.3.0
>
>
> The synchronous checkpointing mechanism of heap-based operator state backends 
> blocks element processing for the duration of the checkpoint.
> We could implement an heap-based operator state backend that allows for 
> asynchronous checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to