wuchong commented on a change in pull request #12441:
URL: https://github.com/apache/flink/pull/12441#discussion_r434984931



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
##########
@@ -52,14 +56,47 @@ object TimeTestUtil {
     }
   }
 
+  /**
+   * A streaming operator to emit records and watermark depends on the input 
data.
+   * The last emitted watermark will be stored in state to emit it again on 
recovery.
+   * This is necessary for late arrival testing with 
[[FailingCollectionSource]].
+   */
   class EventTimeProcessOperator[T]
     extends AbstractStreamOperator[T]
       with OneInputStreamOperator[Either[(Long, T), Long], T] {
 
+    private var currentWatermark: Long = 0L
+    private var watermarkState: ListState[JLong] = _
+
+    override def snapshotState(context: StateSnapshotContext): Unit = {
+      super.snapshotState(context)
+      watermarkState.clear()
+      watermarkState.add(currentWatermark)
+    }
+
+    override def initializeState(context: StateInitializationContext): Unit = {
+      super.initializeState(context)
+      // use union list state to get the max watermark
+      watermarkState = context.getOperatorStateStore.getUnionListState(

Review comment:
       Thanks for pointing it out. I agree with you it's over-engineered. It is 
not simple in rescale case. I will use simple list state as we won't rescale in 
tests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to