Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2648#discussion_r83685681
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 ---
    @@ -199,30 +228,56 @@ public Environment getEnvironment() {
        }
     
        /**
    -    * Calls
    -    * {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
         */
        public void setup() throws Exception {
                operator.setup(mockTask, config, new MockOutput());
                setupCalled = true;
        }
     
        /**
    -    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}. This also
    -    * calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
         * if it was not called before.
         */
    -   public void open() throws Exception {
    +   public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
                if (!setupCalled) {
                        setup();
                }
    +           operator.initializeState(operatorStateHandles);
    +           initializeCalled = true;
    +   }
    +
    +   /**
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
    +    * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
    +    * was not called before.
    +    */
    +   public void open() throws Exception {
    +           if (!initializeCalled) {
    +                   initializeState(null);
    +           }
                operator.open();
        }
     
        /**
         *
         */
    -   public StreamStateHandle snapshot(long checkpointId, long timestamp) 
throws Exception {
    +   public SnapshotInProgressSubtaskState snapshot(long checkpointId, long 
timestamp) throws Exception {
    --- End diff --
    
    I think we can keep the old method signature by doing something like this:
    ```
        /**
         * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
         */
        public final StreamStateHandle snapshot(long checkpointId, long 
timestamp) throws Exception {
                synchronized (checkpointLock) {
                        CheckpointStreamFactory.CheckpointStateOutputStream 
outStream = stateBackend.createStreamFactory(
                                        new JobID(),
                                        
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
    
                        if (operator instanceof StreamCheckpointedOperator) {
                                ((StreamCheckpointedOperator) 
operator).snapshotState(
                                                outStream,
                                                checkpointId,
                                                timestamp);
                        }
    
                        RunnableFuture<OperatorStateHandle> snapshotRunnable = 
operator.snapshotState(
                                        checkpointId,
                                        timestamp,
                                        stateBackend.createStreamFactory(new 
JobID(), "test_op"));
    
                        if (snapshotRunnable != null) {
                                outStream.write(1);
                                snapshotRunnable.run();
                                OperatorStateHandle operatorStateHandle = 
snapshotRunnable.get();
    
                                InstantiationUtil.serializeObject(outStream, 
operatorStateHandle);
                        } else {
                                outStream.write(0);
                        }
    
                        snapshotToStream(checkpointId, timestamp, outStream);
    
                        return outStream.closeAndGetHandle();
                }
        }
    ```
    This multiplexes the results from the different operator snapshotting 
methods into the same stream. The restore method just tweezes out the correct 
items from the stream and hands them to the correct operator methods.
    
    This would let all tests use the same method and we can keep the 
name/signature the same if we evolve the operator/snapshot interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to