tweise commented on a change in pull request #12759:
URL: https://github.com/apache/beam/pull/12759#discussion_r482392713



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -753,6 +754,13 @@ private void maybeEmitWatermark(long watermark) {
       LOG.debug("Emitting watermark {}", watermark);
       currentOutputWatermark = watermark;
       output.emitWatermark(new Watermark(watermark));
+
+      // Check if the final watermark was triggered to perform state cleanup 
for global window
+      if (keyedStateInternals != null
+          && currentOutputWatermark

Review comment:
       The existing solution checks that we have not already reached the 
watermark to avoid repeated execution of the cleanup code. Please carry that 
over.
   
   Also, why check for `keyedStateInternals`? See 
https://github.com/apache/beam/pull/12733#discussion_r480015675

##########
File path: 
runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
##########
@@ -910,21 +892,16 @@ public void testEnsureStateCleanupOnFinalWatermark() 
throws Exception {
         operator.keyedStateInternals.state(
             stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
     state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
+    // No timers have been set for cleanup
+    assertThat(testHarness.numEventTimeTimers(), is(0));
+    // State has been created
     assertThat(testHarness.numKeyedStateEntries(), is(1));
 
     // Generate final watermark to trigger state cleanup
     testHarness.processWatermark(
         new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.plus(1).getMillis()));
 
     assertThat(testHarness.numKeyedStateEntries(), is(0));
-
-    // Close should not repeat state cleanup

Review comment:
       Keep this to check that the cleanup isn't repeated even when the 
watermark is repeated.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1338,19 @@ public void setTimer(
     @Deprecated
     @Override
     public void setTimer(TimerData timer) {
+      if (timer.getTimestamp().isAfter(GlobalWindow.INSTANCE.maxTimestamp())) {

Review comment:
       The namespace we use for cleanup is based on `GlobalWindow` and so 
should be the condition here. I think we can prioritize clarity over the 
duplication of a single conditional return statement, why should be covered by 
the unit tests.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -114,16 +127,27 @@ public K getKey() {
     return address.getSpec().bind(address.getId(), new 
FlinkStateBinder(namespace, context));
   }
 
-  public void clearBagStates(StateNamespace namespace, StateTag<? extends 
BagState> address)
-      throws Exception {
-    CoderTypeSerializer typeSerializer = new 
CoderTypeSerializer<>(VoidCoder.of());
-    flinkStateBackend.applyToAllKeys(
-        namespace.stringKey(),
-        StringSerializer.INSTANCE,
-        new ListStateDescriptor<>(address.getId(), typeSerializer),
-        (key, state) -> {
-          state.clear();
-        });
+  /**
+   * Allows to clear all state for the global watermark when the maximum 
watermark arrives. We do
+   * not clean up the global window state via timers because we are not 
guranteed to ever receive
+   * the final watermark which would lead to an unbounded number of keys and 
cleanup timers.
+   * Instead, the cleanup code below should be run when we finally receive the 
max watermark.

Review comment:
       I would remove "because we are not guaranteed to ever receive the final 
watermark" because that is misleading. The problem is that we accumulate too 
many timers, depending on the number of keys.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -139,17 +163,27 @@ private FlinkStateBinder(StateNamespace namespace, 
StateContext<?> stateContext)
     @Override
     public <T2> ValueState<T2> bindValue(
         String id, StateSpec<ValueState<T2>> spec, Coder<T2> coder) {
-      return new FlinkValueState<>(flinkStateBackend, id, namespace, coder);
+      ValueStateDescriptor<T2> valueStateDescriptor =
+          new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder));
+      globalWindowStateDescriptors.add(valueStateDescriptor);
+      return new FlinkValueState<>(flinkStateBackend, id, namespace, 
valueStateDescriptor);
     }
 
     @Override
     public <T2> BagState<T2> bindBag(String id, StateSpec<BagState<T2>> spec, 
Coder<T2> elemCoder) {
-      return new FlinkBagState<>(flinkStateBackend, id, namespace, elemCoder);
+      ListStateDescriptor<T2> listStateDescriptor =
+          new ListStateDescriptor<>(id, new CoderTypeSerializer<>(elemCoder));
+      globalWindowStateDescriptors.add(listStateDescriptor);

Review comment:
       This is executed on every state access in the portable runner. It might 
be possible to reuse the descriptors. How much could be the net benefit? 
Probably small compared to the fn api overhead.

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
##########
@@ -75,7 +80,14 @@
 public class FlinkStateInternals<K> implements StateInternals {
 
   private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
-  private Coder<K> keyCoder;
+  private final Coder<K> keyCoder;
+
+  /**
+   * A set which contains all state descriptors created in the global window. 
Used for cleanup on

Review comment:
       As implemented this contains all state descriptors, regardless of the 
window?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to