Github user senorcarbone commented on a diff in the pull request: https://github.com/apache/flink/pull/1239#discussion_r41493576 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -322,73 +311,84 @@ public String getName() { return getEnvironment().getTaskNameWithSubtasks(); } + /** + * Gets the lock object on which all operations that involve data and state mutation have to lock. + + * @return The checkpoint lock object. + */ public Object getCheckpointLock() { return lock; } + + public StreamConfig getConfiguration() { + return configuration; + } + + public Map<String, Accumulator<?, ?>> getAccumulatorMap() { + return accumulatorMap; + } + + public Output<StreamRecord<OUT>> getHeadOutput() { + return operatorChain.getChainEntryPoint(); + } + + public RecordWriterOutput<?>[] getStreamOutputs() { + return operatorChain.getStreamOutputs(); + } // ------------------------------------------------------------------------ // Checkpoint and Restore // ------------------------------------------------------------------------ - - @SuppressWarnings("unchecked") + @Override - public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception { - - // We retrieve end restore the states for the chained operators. - List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = - (List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader); - - // We restore all stateful operators - for (int i = 0; i < chainedStates.size(); i++) { - Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state = chainedStates.get(i); - // If state is not null we need to restore it - if (state != null) { - StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i); - ((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state); + public void setInitialState(StreamTaskStateList initialState) throws Exception { + LOG.info("Restoring checkpointed state to task {}", getName()); + + final StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); + final StreamTaskState[] states = initialState.getState(userClassLoader); + + for (int i = 0; i < states.length; i++) { + StreamTaskState state = states[i]; + StreamOperator<?> operator = allOperators[i]; + + if (state != null && operator != null) { + LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName()); + operator.restoreState(state); + } + else if (operator != null) { + LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName()); } } } @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); synchronized (lock) { if (isRunning) { - try { - // We wrap the states of the chained operators in a list, marking non-stateful operators with null - List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>(); - // A wrapper handle is created for the List of statehandles - WrapperStateHandle stateHandle; - try { - - // We construct a list of states for chained tasks - for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) { - if (chainedOperator instanceof StatefulStreamOperator) { - chainedStates.add(((StatefulStreamOperator<?>) chainedOperator) - .getStateSnapshotFromFunction(checkpointId, timestamp)); - }else{ - chainedStates.add(null); - } - } - - stateHandle = CollectionUtils.exists(chainedStates, - NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null; - } - catch (Exception e) { - throw new Exception("Error while drawing snapshot of the user state.", e); + // since both state checkpointing and downstream barrier emission occurs in this + // lock scope, they are an atomic operation regardless of the order in which they occur + // we immediately emit the checkpoint barriers, so the downstream operators can start + // their checkpoint work as soon as possible + operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); --- End diff -- That's a cool and valid addition to the technique. I guess we will some some throughput improvement :). We just need to make sure that the tasksToAcknowledge are always all the tasks (and not e.g. all sinks) for this to work properly which we do from what I remember.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---