Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2648#discussion_r83685681
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
---
@@ -199,30 +228,56 @@ public Environment getEnvironment() {
}
/**
- * Calls
- * {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)} ()}
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)} ()}
*/
public void setup() throws Exception {
operator.setup(mockTask, config, new MockOutput());
setupCalled = true;
}
/**
- * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
- * calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)}
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask,
StreamConfig, Output)}
* if it was not called before.
*/
- public void open() throws Exception {
+ public void initializeState(OperatorStateHandles operatorStateHandles)
throws Exception {
if (!setupCalled) {
setup();
}
+ operator.initializeState(operatorStateHandles);
+ initializeCalled = true;
+ }
+
+ /**
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
+ * Calls {@link
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
if it
+ * was not called before.
+ */
+ public void open() throws Exception {
+ if (!initializeCalled) {
+ initializeState(null);
+ }
operator.open();
}
/**
*
*/
- public StreamStateHandle snapshot(long checkpointId, long timestamp)
throws Exception {
+ public SnapshotInProgressSubtaskState snapshot(long checkpointId, long
timestamp) throws Exception {
--- End diff --
I think we can keep the old method signature by doing something like this:
```
/**
* Calls {@link StreamOperator#snapshotState(long, long,
CheckpointStreamFactory)}.
*/
public final StreamStateHandle snapshot(long checkpointId, long
timestamp) throws Exception {
synchronized (checkpointLock) {
CheckpointStreamFactory.CheckpointStateOutputStream
outStream = stateBackend.createStreamFactory(
new JobID(),
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
if (operator instanceof StreamCheckpointedOperator) {
((StreamCheckpointedOperator)
operator).snapshotState(
outStream,
checkpointId,
timestamp);
}
RunnableFuture<OperatorStateHandle> snapshotRunnable =
operator.snapshotState(
checkpointId,
timestamp,
stateBackend.createStreamFactory(new
JobID(), "test_op"));
if (snapshotRunnable != null) {
outStream.write(1);
snapshotRunnable.run();
OperatorStateHandle operatorStateHandle =
snapshotRunnable.get();
InstantiationUtil.serializeObject(outStream,
operatorStateHandle);
} else {
outStream.write(0);
}
snapshotToStream(checkpointId, timestamp, outStream);
return outStream.closeAndGetHandle();
}
}
```
This multiplexes the results from the different operator snapshotting
methods into the same stream. The restore method just tweezes out the correct
items from the stream and hands them to the correct operator methods.
This would let all tests use the same method and we can keep the
name/signature the same if we evolve the operator/snapshot interfaces.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---