[
https://issues.apache.org/jira/browse/FLINK-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948368#comment-14948368
]
ASF GitHub Bot commented on FLINK-2808:
---------------------------------------
Github user senorcarbone commented on a diff in the pull request:
https://github.com/apache/flink/pull/1239#discussion_r41493576
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
@@ -322,73 +311,84 @@ public String getName() {
return getEnvironment().getTaskNameWithSubtasks();
}
+ /**
+ * Gets the lock object on which all operations that involve data and
state mutation have to lock.
+
+ * @return The checkpoint lock object.
+ */
public Object getCheckpointLock() {
return lock;
}
+
+ public StreamConfig getConfiguration() {
+ return configuration;
+ }
+
+ public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
+ return accumulatorMap;
+ }
+
+ public Output<StreamRecord<OUT>> getHeadOutput() {
+ return operatorChain.getChainEntryPoint();
+ }
+
+ public RecordWriterOutput<?>[] getStreamOutputs() {
+ return operatorChain.getStreamOutputs();
+ }
//
------------------------------------------------------------------------
// Checkpoint and Restore
//
------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
+
@Override
- public void setInitialState(StateHandle<Serializable> stateHandle)
throws Exception {
-
- // We retrieve end restore the states for the chained operators.
- List<Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>>> chainedStates =
- (List<Tuple2<StateHandle<Serializable>,
Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
-
- // We restore all stateful operators
- for (int i = 0; i < chainedStates.size(); i++) {
- Tuple2<StateHandle<Serializable>, Map<String,
OperatorStateHandle>> state = chainedStates.get(i);
- // If state is not null we need to restore it
- if (state != null) {
- StreamOperator<?> chainedOperator =
outputHandler.getChainedOperators().get(i);
- ((StatefulStreamOperator<?>)
chainedOperator).restoreInitialState(state);
+ public void setInitialState(StreamTaskStateList initialState) throws
Exception {
+ LOG.info("Restoring checkpointed state to task {}", getName());
+
+ final StreamOperator<?>[] allOperators =
operatorChain.getAllOperators();
+ final StreamTaskState[] states =
initialState.getState(userClassLoader);
+
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = states[i];
+ StreamOperator<?> operator = allOperators[i];
+
+ if (state != null && operator != null) {
+ LOG.debug("Task {} in chain ({}) has
checkpointed state", i, getName());
+ operator.restoreState(state);
+ }
+ else if (operator != null) {
+ LOG.debug("Task {} in chain ({}) does not have
checkpointed state", i, getName());
}
}
}
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws
Exception {
-
LOG.debug("Starting checkpoint {} on task {}", checkpointId,
getName());
synchronized (lock) {
if (isRunning) {
- try {
- // We wrap the states of the chained
operators in a list, marking non-stateful operators with null
- List<Tuple2<StateHandle<Serializable>,
Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
- // A wrapper handle is created for the
List of statehandles
- WrapperStateHandle stateHandle;
- try {
-
- // We construct a list of
states for chained tasks
- for (StreamOperator<?>
chainedOperator : outputHandler.getChainedOperators()) {
- if (chainedOperator
instanceof StatefulStreamOperator) {
-
chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
-
.getStateSnapshotFromFunction(checkpointId, timestamp));
- }else{
-
chainedStates.add(null);
- }
- }
-
- stateHandle =
CollectionUtils.exists(chainedStates,
-
NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
- }
- catch (Exception e) {
- throw new Exception("Error
while drawing snapshot of the user state.", e);
+ // since both state checkpointing and
downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation
regardless of the order in which they occur
+ // we immediately emit the checkpoint barriers,
so the downstream operators can start
+ // their checkpoint work as soon as possible
+
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
--- End diff --
That's a cool and valid addition to the technique. I guess we will some
some throughput improvement :). We just need to make sure that the
tasksToAcknowledge are always all the tasks (and not e.g. all sinks) for this
to work properly which we do from what I remember.
> Rework / Extend the StatehandleProvider
> ---------------------------------------
>
> Key: FLINK-2808
> URL: https://issues.apache.org/jira/browse/FLINK-2808
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.10
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> I would like to make some changes (mostly additions) to the
> {{StateHandleProvider}}. Ideally for the upcoming release, as it is somewhat
> part of the public API.
> The rational behind this is to handle in a nice and extensible way the
> creation of key/value state backed by various implementations (FS,
> distributed KV store, local KV store with FS backup, ...) and various
> checkpointing ways (full dump, append, incremental keys, ...)
> The changes would concretely be:
> 1. There should be a default {{StateHandleProvider}} set on the execution
> environment. Functions can later specify the {{StateHandleProvider}} when
> grabbing the {{StreamOperatorState}} from the runtime context (plus
> optionally a {{Checkpointer}})
> 2. The {{StreamOperatorState}} is created from the {{StateHandleProvider}}.
> That way, a KeyValueStore state backend can create a {{StreamOperatorState}}
> that directly updates data in the KV store on every access, if that is
> desired (and filter accesses by timestamps to only show committed data)
> 3. The StateHandleProvider should have methods to get an output stream that
> writes to the state checkpoint directly (and returns a StateHandle upon
> closing). That way we can convert and dump large state into the checkpoint
> without crating a full copy in memory before.
> Lastly, I would like to change some names
> - {{StateHandleProvider}} to either {{StateBackend}}, {{StateStore}}, or
> {{StateProvider}} (simpler name).
> - {{StreamOperatorState}} to either {{State}} or {{KVState}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)