This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fca0bea5e9f Fix cleanup timer timestamp to not exceed max allowed 
timestamp (#33037)
fca0bea5e9f is described below

commit fca0bea5e9fd9bff31c784b66085d0196ad04678
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Nov 11 10:12:29 2024 -0800

    Fix cleanup timer timestamp to not exceed max allowed timestamp (#33037)
    
    This fixes an exception during drain on jobs with GlobalWindows + 
AllowedLateness > 24h + @OnExpiredWindows callback
---
 .../runners/dataflow/worker/SimpleParDoFn.java     |  16 +++-
 .../dataflow/worker/UserParDoFnFactoryTest.java    | 102 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 4 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index a413c2c03db..558848f488a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
+
   // TODO: Remove once Distributions has shipped.
   @VisibleForTesting
   static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = 
"outputs_per_element_counter";
@@ -174,6 +175,7 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
 
   /** Simple state tracker to calculate PerElementOutputCount counter. */
   private interface OutputsPerElementTracker {
+
     void onOutput();
 
     void onProcessElement();
@@ -182,6 +184,7 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
   }
 
   private class OutputsPerElementTrackerImpl implements 
OutputsPerElementTracker {
+
     private long outputsPerElement;
     private final Counter<Long, CounterFactory.CounterDistribution> counter;
 
@@ -214,6 +217,7 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
 
   /** No-op {@link OutputsPerElementTracker} implementation used when the 
counter is disabled. */
   private static class NoopOutputsPerElementTracker implements 
OutputsPerElementTracker {
+
     private NoopOutputsPerElementTracker() {}
 
     public static final OutputsPerElementTracker INSTANCE = new 
NoopOutputsPerElementTracker();
@@ -516,10 +520,14 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
 
   private Instant earliestAllowableCleanupTime(
       BoundedWindow window, WindowingStrategy windowingStrategy) {
-    return window
-        .maxTimestamp()
-        .plus(windowingStrategy.getAllowedLateness())
-        .plus(Duration.millis(1L));
+    Instant cleanupTime =
+        window
+            .maxTimestamp()
+            .plus(windowingStrategy.getAllowedLateness())
+            .plus(Duration.millis(1L));
+    return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
+        ? BoundedWindow.TIMESTAMP_MAX_VALUE
+        : cleanupTime;
   }
 
   /**
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index ff114ef2f07..c1e5000f03d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.theInstance;
@@ -153,6 +154,21 @@ public class UserParDoFnFactoryTest {
     public void processElement(ProcessContext c) {}
   }
 
+  private static class TestStatefulDoFnWithWindowExpiration
+      extends DoFn<KV<String, Integer>, Void> {
+
+    public static final String STATE_ID = "state-id";
+
+    @StateId(STATE_ID)
+    private final StateSpec<ValueState<String>> spec = 
StateSpecs.value(StringUtf8Coder.of());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {}
+
+    @OnWindowExpiration
+    public void onWindowExpiration() {}
+  }
+
   private static final TupleTag<String> MAIN_OUTPUT = new TupleTag<>("1");
 
   private UserParDoFnFactory factory = UserParDoFnFactory.createDefault();
@@ -373,6 +389,92 @@ public class UserParDoFnFactoryTest {
             firstWindow.maxTimestamp().plus(Duration.millis(1L)));
   }
 
+  /**
+   * Regression test for global window + OnWindowExpiration + allowed lateness 
> max allowed time
+   */
+  @Test
+  public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws 
Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    CounterSet counters = new CounterSet();
+    DoFn<?, ?> initialFn = new TestStatefulDoFnWithWindowExpiration();
+    Duration allowedLateness = Duration.standardDays(2);
+    CloudObject cloudObject =
+        getCloudObject(
+            initialFn, 
WindowingStrategy.globalDefault().withAllowedLateness(allowedLateness));
+
+    StateInternals stateInternals = InMemoryStateInternals.forKey("dummy");
+
+    TimerInternals timerInternals = mock(TimerInternals.class);
+
+    DataflowStepContext stepContext = mock(DataflowStepContext.class);
+    when(stepContext.timerInternals()).thenReturn(timerInternals);
+    DataflowStepContext userStepContext = mock(DataflowStepContext.class);
+    when(stepContext.namespacedToUser()).thenReturn(userStepContext);
+    when(stepContext.stateInternals()).thenReturn(stateInternals);
+    when(userStepContext.stateInternals()).thenReturn((StateInternals) 
stateInternals);
+
+    DataflowExecutionContext<DataflowStepContext> executionContext =
+        mock(DataflowExecutionContext.class);
+    TestOperationContext operationContext = 
TestOperationContext.create(counters);
+    
when(executionContext.getStepContext(operationContext)).thenReturn(stepContext);
+    when(executionContext.getSideInputReader(any(), any(), any()))
+        .thenReturn(NullSideInputReader.empty());
+
+    ParDoFn parDoFn =
+        factory.create(
+            options,
+            cloudObject,
+            Collections.emptyList(),
+            MAIN_OUTPUT,
+            ImmutableMap.of(MAIN_OUTPUT, 0),
+            executionContext,
+            operationContext);
+
+    Receiver rcvr = new OutputReceiver();
+    parDoFn.startBundle(rcvr);
+
+    GlobalWindow globalWindow = GlobalWindow.INSTANCE;
+    parDoFn.processElement(
+        WindowedValue.of("foo", new Instant(1), globalWindow, 
PaneInfo.NO_FIRING));
+
+    assertThat(
+        globalWindow.maxTimestamp().plus(allowedLateness),
+        greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    verify(stepContext)
+        .setStateCleanupTimer(
+            SimpleParDoFn.CLEANUP_TIMER_ID,
+            globalWindow,
+            GlobalWindow.Coder.INSTANCE,
+            BoundedWindow.TIMESTAMP_MAX_VALUE,
+            BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)));
+
+    StateNamespace globalWindowNamespace =
+        StateNamespaces.window(GlobalWindow.Coder.INSTANCE, globalWindow);
+    StateTag<ValueState<String>> tag =
+        StateTags.tagForSpec(
+            TestStatefulDoFnWithWindowExpiration.STATE_ID, 
StateSpecs.value(StringUtf8Coder.of()));
+
+    when(userStepContext.getNextFiredTimer((Coder) 
GlobalWindow.Coder.INSTANCE)).thenReturn(null);
+    when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE))
+        .thenReturn(
+            TimerData.of(
+                SimpleParDoFn.CLEANUP_TIMER_ID,
+                globalWindowNamespace,
+                BoundedWindow.TIMESTAMP_MAX_VALUE,
+                BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
+                TimeDomain.EVENT_TIME))
+        .thenReturn(null);
+
+    // Set up non-empty state. We don't mock + verify calls to clear() but 
instead
+    // check that state is actually empty. We mustn't care how it is 
accomplished.
+    stateInternals.state(globalWindowNamespace, tag).write("first");
+
+    // And this should clean up the second window
+    parDoFn.processTimers();
+
+    assertThat(stateInternals.state(globalWindowNamespace, tag).read(), 
nullValue());
+  }
+
   @Test
   public void testCleanupWorks() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();

Reply via email to