Github user senorcarbone commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1239#discussion_r41493576
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -322,73 +311,84 @@ public String getName() {
                return getEnvironment().getTaskNameWithSubtasks();
        }
     
    +   /**
    +    * Gets the lock object on which all operations that involve data and 
state mutation have to lock. 
    +    
    +    * @return The checkpoint lock object.
    +    */
        public Object getCheckpointLock() {
                return lock;
        }
    +   
    +   public StreamConfig getConfiguration() {
    +           return configuration;
    +   }
    +
    +   public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
    +           return accumulatorMap;
    +   }
    +   
    +   public Output<StreamRecord<OUT>> getHeadOutput() {
    +           return operatorChain.getChainEntryPoint();
    +   }
    +   
    +   public RecordWriterOutput<?>[] getStreamOutputs() {
    +           return operatorChain.getStreamOutputs();
    +   }
     
        // 
------------------------------------------------------------------------
        //  Checkpoint and Restore
        // 
------------------------------------------------------------------------
    -
    -   @SuppressWarnings("unchecked")
    +   
        @Override
    -   public void setInitialState(StateHandle<Serializable> stateHandle) 
throws Exception {
    -
    -           // We retrieve end restore the states for the chained operators.
    -           List<Tuple2<StateHandle<Serializable>, Map<String, 
OperatorStateHandle>>> chainedStates = 
    -                           (List<Tuple2<StateHandle<Serializable>, 
Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
    -
    -           // We restore all stateful operators
    -           for (int i = 0; i < chainedStates.size(); i++) {
    -                   Tuple2<StateHandle<Serializable>, Map<String, 
OperatorStateHandle>> state = chainedStates.get(i);
    -                   // If state is not null we need to restore it
    -                   if (state != null) {
    -                           StreamOperator<?> chainedOperator = 
outputHandler.getChainedOperators().get(i);
    -                           ((StatefulStreamOperator<?>) 
chainedOperator).restoreInitialState(state);
    +   public void setInitialState(StreamTaskStateList initialState) throws 
Exception {
    +           LOG.info("Restoring checkpointed state to task {}", getName());
    +           
    +           final StreamOperator<?>[] allOperators = 
operatorChain.getAllOperators();
    +           final StreamTaskState[] states = 
initialState.getState(userClassLoader);
    +           
    +           for (int i = 0; i < states.length; i++) {
    +                   StreamTaskState state = states[i];
    +                   StreamOperator<?> operator = allOperators[i];
    +                   
    +                   if (state != null && operator != null) {
    +                           LOG.debug("Task {} in chain ({}) has 
checkpointed state", i, getName());
    +                           operator.restoreState(state);
    +                   }
    +                   else if (operator != null) {
    +                           LOG.debug("Task {} in chain ({}) does not have 
checkpointed state", i, getName());
                        }
                }
        }
     
        @Override
        public void triggerCheckpoint(long checkpointId, long timestamp) throws 
Exception {
    -
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
                
                synchronized (lock) {
                        if (isRunning) {
    -                           try {
    -                                   // We wrap the states of the chained 
operators in a list, marking non-stateful operators with null
    -                                   List<Tuple2<StateHandle<Serializable>, 
Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
     
    -                                   // A wrapper handle is created for the 
List of statehandles
    -                                   WrapperStateHandle stateHandle;
    -                                   try {
    -
    -                                           // We construct a list of 
states for chained tasks
    -                                           for (StreamOperator<?> 
chainedOperator : outputHandler.getChainedOperators()) {
    -                                                   if (chainedOperator 
instanceof StatefulStreamOperator) {
    -                                                           
chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
    -                                                                           
.getStateSnapshotFromFunction(checkpointId, timestamp));
    -                                                   }else{
    -                                                           
chainedStates.add(null);
    -                                                   }
    -                                           }
    -
    -                                           stateHandle = 
CollectionUtils.exists(chainedStates,
    -                                                           
NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
    -                                   }
    -                                   catch (Exception e) {
    -                                           throw new Exception("Error 
while drawing snapshot of the user state.", e);
    +                           // since both state checkpointing and 
downstream barrier emission occurs in this
    +                           // lock scope, they are an atomic operation 
regardless of the order in which they occur
    +                           // we immediately emit the checkpoint barriers, 
so the downstream operators can start
    +                           // their checkpoint work as soon as possible
    +                           
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
    --- End diff --
    
    That's a cool and valid addition to the technique. I guess we will some 
some throughput improvement :). We just need to make sure that the 
tasksToAcknowledge are always all the tasks (and not e.g. all sinks) for this 
to work properly which we do from what I remember.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to