[
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585088#comment-15585088
]
ASF GitHub Bot commented on FLINK-4844:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/2648#discussion_r83822468
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
/**
- * Calls
- * {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)} ()}
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)} ()}
*/
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
/**
- * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
- * calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)}
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)}
* if it was not called before.
*/
- public void open() throws Exception {
+ public void initializeState(OperatorStateHandles operatorStateHandles)
throws Exception {
if (!setupCalled) {
setup();
}
+ operator.initializeState(operatorStateHandles);
+ initializeCalled = true;
+ }
+
+ /**
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
if it
+ * was not called before.
+ */
+ public void open() throws Exception {
+ if (!initializeCalled) {
+ initializeState(null);
+ }
operator.open();
}
/**
*
*/
- public StreamStateHandle snapshot(long checkpointId, long timestamp)
throws Exception {
+ public SnapshotInProgressSubtaskState snapshot(long checkpointId, long
timestamp) throws Exception {
--- End diff --
I think the idea of having a single method is nice, and if there is no
special reason why we should keep the old signature, I suggest to do it the
other way around. `OperatorSnapshotResult`is already a container for all
operator states (except the legacy state that will be removed in the near
future). Using this removed the need for the multiplexing.
However, `OperatorSnapshotResult` does not contain the legacy state
anymore, so for the time being, we might return a Tuple2 of both, or some
special container class which could also strip away the `RunnableFuture` part.
What do you think?
> Partitionable Raw Keyed/Operator State
> --------------------------------------
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
> Issue Type: New Feature
> Reporter: Stefan Richter
> Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using
> backends. However, the serialization code for many operators is build around
> reading/writing their state to a stream for checkpointing. We want to provide
> partitionable states also through streams, so that migrating existing
> operators becomes more easy.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)