[ 
https://issues.apache.org/jira/browse/FLINK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948368#comment-14948368
 ] 

ASF GitHub Bot commented on FLINK-2808:
---------------------------------------

Github user senorcarbone commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1239#discussion_r41493576
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -322,73 +311,84 @@ public String getName() {
                return getEnvironment().getTaskNameWithSubtasks();
        }
     
    +   /**
    +    * Gets the lock object on which all operations that involve data and 
state mutation have to lock. 
    +    
    +    * @return The checkpoint lock object.
    +    */
        public Object getCheckpointLock() {
                return lock;
        }
    +   
    +   public StreamConfig getConfiguration() {
    +           return configuration;
    +   }
    +
    +   public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
    +           return accumulatorMap;
    +   }
    +   
    +   public Output<StreamRecord<OUT>> getHeadOutput() {
    +           return operatorChain.getChainEntryPoint();
    +   }
    +   
    +   public RecordWriterOutput<?>[] getStreamOutputs() {
    +           return operatorChain.getStreamOutputs();
    +   }
     
        // 
------------------------------------------------------------------------
        //  Checkpoint and Restore
        // 
------------------------------------------------------------------------
    -
    -   @SuppressWarnings("unchecked")
    +   
        @Override
    -   public void setInitialState(StateHandle<Serializable> stateHandle) 
throws Exception {
    -
    -           // We retrieve end restore the states for the chained operators.
    -           List<Tuple2<StateHandle<Serializable>, Map<String, 
OperatorStateHandle>>> chainedStates = 
    -                           (List<Tuple2<StateHandle<Serializable>, 
Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
    -
    -           // We restore all stateful operators
    -           for (int i = 0; i < chainedStates.size(); i++) {
    -                   Tuple2<StateHandle<Serializable>, Map<String, 
OperatorStateHandle>> state = chainedStates.get(i);
    -                   // If state is not null we need to restore it
    -                   if (state != null) {
    -                           StreamOperator<?> chainedOperator = 
outputHandler.getChainedOperators().get(i);
    -                           ((StatefulStreamOperator<?>) 
chainedOperator).restoreInitialState(state);
    +   public void setInitialState(StreamTaskStateList initialState) throws 
Exception {
    +           LOG.info("Restoring checkpointed state to task {}", getName());
    +           
    +           final StreamOperator<?>[] allOperators = 
operatorChain.getAllOperators();
    +           final StreamTaskState[] states = 
initialState.getState(userClassLoader);
    +           
    +           for (int i = 0; i < states.length; i++) {
    +                   StreamTaskState state = states[i];
    +                   StreamOperator<?> operator = allOperators[i];
    +                   
    +                   if (state != null && operator != null) {
    +                           LOG.debug("Task {} in chain ({}) has 
checkpointed state", i, getName());
    +                           operator.restoreState(state);
    +                   }
    +                   else if (operator != null) {
    +                           LOG.debug("Task {} in chain ({}) does not have 
checkpointed state", i, getName());
                        }
                }
        }
     
        @Override
        public void triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception {
    -
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
                
                synchronized (lock) {
                        if (isRunning) {
    -                           try {
    -                                   // We wrap the states of the chained 
operators in a list, marking non-stateful operators with null
    -                                   List<Tuple2<StateHandle<Serializable>, 
Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
     
    -                                   // A wrapper handle is created for the 
List of statehandles
    -                                   WrapperStateHandle stateHandle;
    -                                   try {
    -
    -                                           // We construct a list of 
states for chained tasks
    -                                           for (StreamOperator<?> 
chainedOperator : outputHandler.getChainedOperators()) {
    -                                                   if (chainedOperator 
instanceof StatefulStreamOperator) {
    -                                                           
chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
    -                                                                           
.getStateSnapshotFromFunction(checkpointId, timestamp));
    -                                                   }else{
    -                                                           
chainedStates.add(null);
    -                                                   }
    -                                           }
    -
    -                                           stateHandle = 
CollectionUtils.exists(chainedStates,
    -                                                           
NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
    -                                   }
    -                                   catch (Exception e) {
    -                                           throw new Exception("Error 
while drawing snapshot of the user state.", e);
    +                           // since both state checkpointing and 
downstream barrier emission occurs in this
    +                           // lock scope, they are an atomic operation 
regardless of the order in which they occur
    +                           // we immediately emit the checkpoint barriers, 
so the downstream operators can start
    +                           // their checkpoint work as soon as possible
    +                           
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
    --- End diff --
    
    That's a cool and valid addition to the technique. I guess we will some 
some throughput improvement :). We just need to make sure that the 
tasksToAcknowledge are always all the tasks (and not e.g. all sinks) for this 
to work properly which we do from what I remember.


> Rework / Extend the StatehandleProvider
> ---------------------------------------
>
>                 Key: FLINK-2808
>                 URL: https://issues.apache.org/jira/browse/FLINK-2808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.10
>
>
> I would like to make some changes (mostly additions) to the 
> {{StateHandleProvider}}. Ideally for the upcoming release, as it is somewhat 
> part of the public API.
> The rational behind this is to handle in a nice and extensible way the 
> creation of key/value state backed by various implementations (FS, 
> distributed KV store, local KV store with FS backup, ...) and various 
> checkpointing ways (full dump, append, incremental keys, ...)
> The changes would concretely be:
> 1.  There should be a default {{StateHandleProvider}} set on the execution 
> environment. Functions can later specify the {{StateHandleProvider}} when 
> grabbing the {{StreamOperatorState}} from the runtime context (plus 
> optionally a {{Checkpointer}})
> 2.  The {{StreamOperatorState}} is created from the {{StateHandleProvider}}. 
> That way, a KeyValueStore state backend can create a {{StreamOperatorState}} 
> that directly updates data in the KV store on every access, if that is 
> desired (and filter accesses by timestamps to only show committed data)
> 3.  The StateHandleProvider should have methods to get an output stream that 
> writes to the state checkpoint directly (and returns a StateHandle upon 
> closing). That way we can convert and dump large state into the checkpoint 
> without crating a full copy in memory before.
> Lastly, I would like to change some names
>   - {{StateHandleProvider}} to either {{StateBackend}}, {{StateStore}}, or 
> {{StateProvider}} (simpler name).
>   - {{StreamOperatorState}} to either {{State}} or {{KVState}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to