scwhittle commented on code in PR #37574:
URL: https://github.com/apache/beam/pull/37574#discussion_r3191212559


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java:
##########
@@ -133,48 +148,71 @@ public Future<Windmill.WorkItemCommitRequest> persist(
 
     Future<Windmill.WorkItemCommitRequest> result;
 
-    if (!cleared && localAdditions == null) {
+    if (!knownEmpty && !cleared && localAdditions == null) {
       // No changes, so no need to update Windmill and no need to cache any 
value.
       return 
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());

Review Comment:
   move this to where the unreachable condition is thrown instead?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java:
##########
@@ -99,6 +101,16 @@ public boolean isClosed(StateAccessor<?> state) {
     return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
   }
 
+  /** Return true if the window is new (no trigger state has ever been 
persisted). */
+  public boolean isNew(StateAccessor<?> state) {
+    return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == 
null;
+  }
+
+  @VisibleForTesting
+  public BitSet getFinishedBits(StateAccessor<?> state) {
+    return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
+  }
+
   public void prefetchIsClosed(StateAccessor<?> state) {

Review Comment:
   should we rename this prefetchFinishedBits? it would want to be called for 
isNew also and prefetchIsClosed doesn't sound appropriate necessarily



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java:
##########
@@ -38,4 +38,14 @@ public interface WatermarkHoldState extends 
GroupingState<Instant, Instant> {
 
   @Override
   WatermarkHoldState readLater();
+
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Permit marking the state as empty locally, without necessarily 
clearing it in the backend.
+   *
+   * <p>This may be used by runners to optimize out unnecessary state reads.
+   */
+  @Internal

Review Comment:
   think there is some experimental annotation that might be appropriate too?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java:
##########
@@ -187,12 +199,9 @@ private void persistFinishedSet(
     }
 
     ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
+    @Nullable BitSet currentBits = finishedSetState.read();
+    if (currentBits == null || 
!currentBits.equals(modifiedFinishedSet.getBitSet())) {
+      finishedSetState.write(modifiedFinishedSet.getBitSet());

Review Comment:
   I'm not seeing how this change is guarded by the experiment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to