Github user senorcarbone commented on a diff in the pull request:
https://github.com/apache/flink/pull/1239#discussion_r41493576
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
@@ -322,73 +311,84 @@ public String getName() {
return getEnvironment().getTaskNameWithSubtasks();
}
+ /**
+ * Gets the lock object on which all operations that involve data and
state mutation have to lock.
+
+ * @return The checkpoint lock object.
+ */
public Object getCheckpointLock() {
return lock;
}
+
+ public StreamConfig getConfiguration() {
+ return configuration;
+ }
+
+ public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
+ return accumulatorMap;
+ }
+
+ public Output<StreamRecord<OUT>> getHeadOutput() {
+ return operatorChain.getChainEntryPoint();
+ }
+
+ public RecordWriterOutput<?>[] getStreamOutputs() {
+ return operatorChain.getStreamOutputs();
+ }
//
------------------------------------------------------------------------
// Checkpoint and Restore
//
------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
+
@Override
- public void setInitialState(StateHandle<Serializable> stateHandle)
throws Exception {
-
- // We retrieve end restore the states for the chained operators.
- List<Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>>> chainedStates =
- (List<Tuple2<StateHandle<Serializable>,
Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
-
- // We restore all stateful operators
- for (int i = 0; i < chainedStates.size(); i++) {
- Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>> state = chainedStates.get(i);
- // If state is not null we need to restore it
- if (state != null) {
- StreamOperator<?> chainedOperator =
outputHandler.getChainedOperators().get(i);
- ((StatefulStreamOperator<?>)
chainedOperator).restoreInitialState(state);
+ public void setInitialState(StreamTaskStateList initialState) throws
Exception {
+ LOG.info("Restoring checkpointed state to task {}", getName());
+
+ final StreamOperator<?>[] allOperators =
operatorChain.getAllOperators();
+ final StreamTaskState[] states =
initialState.getState(userClassLoader);
+
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = states[i];
+ StreamOperator<?> operator = allOperators[i];
+
+ if (state != null && operator != null) {
+ LOG.debug("Task {} in chain ({}) has
checkpointed state", i, getName());
+ operator.restoreState(state);
+ }
+ else if (operator != null) {
+ LOG.debug("Task {} in chain ({}) does not have
checkpointed state", i, getName());
}
}
}
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws
Exception {
-
LOG.debug("Starting checkpoint {} on task {}", checkpointId,
getName());
synchronized (lock) {
if (isRunning) {
- try {
- // We wrap the states of the chained
operators in a list, marking non-stateful operators with null
- List<Tuple2<StateHandle<Serializable>,
Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
- // A wrapper handle is created for the
List of statehandles
- WrapperStateHandle stateHandle;
- try {
-
- // We construct a list of
states for chained tasks
- for (StreamOperator<?>
chainedOperator : outputHandler.getChainedOperators()) {
- if (chainedOperator
instanceof StatefulStreamOperator) {
-
chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
-
.getStateSnapshotFromFunction(checkpointId, timestamp));
- }else{
-
chainedStates.add(null);
- }
- }
-
- stateHandle =
CollectionUtils.exists(chainedStates,
-
NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
- }
- catch (Exception e) {
- throw new Exception("Error
while drawing snapshot of the user state.", e);
+ // since both state checkpointing and
downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation
regardless of the order in which they occur
+ // we immediately emit the checkpoint barriers,
so the downstream operators can start
+ // their checkpoint work as soon as possible
+
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
--- End diff --
That's a cool and valid addition to the technique. I guess we will some
some throughput improvement :). We just need to make sure that the
tasksToAcknowledge are always all the tasks (and not e.g. all sinks) for this
to work properly which we do from what I remember.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---