mxm commented on a change in pull request #12759:
URL: https://github.com/apache/beam/pull/12759#discussion_r482840511



##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
##########
@@ -910,21 +892,16 @@ public void testEnsureStateCleanupOnFinalWatermark() 
throws Exception {
         operator.keyedStateInternals.state(
             stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
     state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
+    // No timers have been set for cleanup
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+    // State has been created
     assertThat(testHarness.numKeyedStateEntries(), is(1));
 
     // Generate final watermark to trigger state cleanup
     testHarness.processWatermark(
         new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.plus(1).getMillis()));
 
     assertThat(testHarness.numKeyedStateEntries(), is(0));
-
-    // Close should not repeat state cleanup

Review comment:
       I believe the cleanup ought to be repeated if there is new state. If 
there is none, then no cleanup will be performed because we keep track of the 
state descriptors created and clear that set on clean-up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to