[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364092#comment-16364092
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168184599
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) {
                return sum;
        }
     
    -   public <N, V> StateTable<K, N, V> 
newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
    -           return asynchronousSnapshots ?
    -                           new CopyOnWriteStateTable<>(this, newMetaInfo) :
    -                           new NestedMapsStateTable<>(this, newMetaInfo);
    -   }
    -
        @Override
        public boolean supportsAsynchronousSnapshots() {
    -           return asynchronousSnapshots;
    +           return snapshotStrategy.isAsynchronous();
        }
     
        @VisibleForTesting
        public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() {
                return localRecoveryConfig;
        }
    +
    +   /**
    +    * Base class for the snapshots of the heap backend that outlines the 
algorithm and offers some hooks to realize
    +    * the concrete strategies.
    +    */
    +   private abstract class HeapSnapshotStrategy implements 
SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
    +
    +           @Override
    +           public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
performSnapshot(
    +                   long checkpointId,
    +                   long timestamp,
    +                   CheckpointStreamFactory streamFactory,
    +                   CheckpointOptions checkpointOptions) throws Exception {
    +
    +                   if (!hasRegisteredState()) {
    +                           return DoneFuture.nullValue();
    +                   }
    +
    +                   long syncStartTime = System.currentTimeMillis();
    +
    +                   Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
    +                           "Too many KV-States: " + stateTables.size() +
    +                                   ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
    +
    +                   List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
    +                           new ArrayList<>(stateTables.size());
    +
    +                   final Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
    +
    +                   final Map<StateTable<K, ?, ?>, StateTableSnapshot> 
cowStateStableSnapshots =
    +                           new HashedMap(stateTables.size());
    +
    +                   for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
    +                           kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
    +                           StateTable<K, ?, ?> stateTable = 
kvState.getValue();
    +                           if (null != stateTable) {
    +                                   
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
    +                                   cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
    +                           }
    +                   }
    +
    +                   final KeyedBackendSerializationProxy<K> 
serializationProxy =
    +                           new KeyedBackendSerializationProxy<>(
    +                                   keySerializer,
    +                                   metaInfoSnapshots,
    +                                   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
    +
    +                   //--------------------------------------------------- 
this becomes the end of sync part
    +
    +                   // implementation of the async IO operation, based on 
FutureTask
    +                   final 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
    +                           new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
    +
    +                                   
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
    +
    +                                   @Override
    +                                   protected void acquireResources() 
throws Exception {
    +                                           stream = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
    +                                           
cancelStreamRegistry.registerCloseable(stream);
    +                                   }
    +
    +                                   @Override
    +                                   protected void releaseResources() 
throws Exception {
    +
    +                                           if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
    +                                                   
IOUtils.closeQuietly(stream);
    +                                                   stream = null;
    +                                           }
    +
    +                                           for (StateTableSnapshot 
tableSnapshot : cowStateStableSnapshots.values()) {
    +                                                   tableSnapshot.release();
    +                                           }
    +                                   }
    +
    +                                   @Override
    +                                   protected void stopOperation() throws 
Exception {
    +                                           if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
    +                                                   
IOUtils.closeQuietly(stream);
    +                                                   stream = null;
    +                                           }
    +                                   }
    +
    +                                   @Override
    +                                   protected 
SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
    +
    +                                           long startTime = 
System.currentTimeMillis();
    +
    +                                           
CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
    +
    +                                           DataOutputViewStreamWrapper 
outView = new DataOutputViewStreamWrapper(localStream);
    +                                           
serializationProxy.write(outView);
    +
    +                                           long[] keyGroupRangeOffsets = 
new long[keyGroupRange.getNumberOfKeyGroups()];
    +
    +                                           for (int keyGroupPos = 0; 
keyGroupPos < keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
    +                                                   int keyGroupId = 
keyGroupRange.getKeyGroupId(keyGroupPos);
    +                                                   
keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
    +                                                   
outView.writeInt(keyGroupId);
    +
    +                                                   for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
    +                                                           OutputStream 
kgCompressionOut = 
keyGroupCompressionDecorator.decorateWithCompression(localStream);
    --- End diff --
    
    👍 


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to