This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fca0bea5e9f Fix cleanup timer timestamp to not exceed max allowed
timestamp (#33037)
fca0bea5e9f is described below
commit fca0bea5e9fd9bff31c784b66085d0196ad04678
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Nov 11 10:12:29 2024 -0800
Fix cleanup timer timestamp to not exceed max allowed timestamp (#33037)
This fixes an exception during drain on jobs with GlobalWindows +
AllowedLateness > 24h + @OnExpiredWindows callback
---
.../runners/dataflow/worker/SimpleParDoFn.java | 16 +++-
.../dataflow/worker/UserParDoFnFactoryTest.java | 102 +++++++++++++++++++++
2 files changed, 114 insertions(+), 4 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index a413c2c03db..558848f488a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -77,6 +77,7 @@ import org.slf4j.LoggerFactory;
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
+
// TODO: Remove once Distributions has shipped.
@VisibleForTesting
static final String OUTPUTS_PER_ELEMENT_EXPERIMENT =
"outputs_per_element_counter";
@@ -174,6 +175,7 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
/** Simple state tracker to calculate PerElementOutputCount counter. */
private interface OutputsPerElementTracker {
+
void onOutput();
void onProcessElement();
@@ -182,6 +184,7 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
}
private class OutputsPerElementTrackerImpl implements
OutputsPerElementTracker {
+
private long outputsPerElement;
private final Counter<Long, CounterFactory.CounterDistribution> counter;
@@ -214,6 +217,7 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
/** No-op {@link OutputsPerElementTracker} implementation used when the
counter is disabled. */
private static class NoopOutputsPerElementTracker implements
OutputsPerElementTracker {
+
private NoopOutputsPerElementTracker() {}
public static final OutputsPerElementTracker INSTANCE = new
NoopOutputsPerElementTracker();
@@ -516,10 +520,14 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
private Instant earliestAllowableCleanupTime(
BoundedWindow window, WindowingStrategy windowingStrategy) {
- return window
- .maxTimestamp()
- .plus(windowingStrategy.getAllowedLateness())
- .plus(Duration.millis(1L));
+ Instant cleanupTime =
+ window
+ .maxTimestamp()
+ .plus(windowingStrategy.getAllowedLateness())
+ .plus(Duration.millis(1L));
+ return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
+ ? BoundedWindow.TIMESTAMP_MAX_VALUE
+ : cleanupTime;
}
/**
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index ff114ef2f07..c1e5000f03d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.theInstance;
@@ -153,6 +154,21 @@ public class UserParDoFnFactoryTest {
public void processElement(ProcessContext c) {}
}
+ private static class TestStatefulDoFnWithWindowExpiration
+ extends DoFn<KV<String, Integer>, Void> {
+
+ public static final String STATE_ID = "state-id";
+
+ @StateId(STATE_ID)
+ private final StateSpec<ValueState<String>> spec =
StateSpecs.value(StringUtf8Coder.of());
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {}
+
+ @OnWindowExpiration
+ public void onWindowExpiration() {}
+ }
+
private static final TupleTag<String> MAIN_OUTPUT = new TupleTag<>("1");
private UserParDoFnFactory factory = UserParDoFnFactory.createDefault();
@@ -373,6 +389,92 @@ public class UserParDoFnFactoryTest {
firstWindow.maxTimestamp().plus(Duration.millis(1L)));
}
+ /**
+ * Regression test for global window + OnWindowExpiration + allowed lateness
> max allowed time
+ */
+ @Test
+ public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws
Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ CounterSet counters = new CounterSet();
+ DoFn<?, ?> initialFn = new TestStatefulDoFnWithWindowExpiration();
+ Duration allowedLateness = Duration.standardDays(2);
+ CloudObject cloudObject =
+ getCloudObject(
+ initialFn,
WindowingStrategy.globalDefault().withAllowedLateness(allowedLateness));
+
+ StateInternals stateInternals = InMemoryStateInternals.forKey("dummy");
+
+ TimerInternals timerInternals = mock(TimerInternals.class);
+
+ DataflowStepContext stepContext = mock(DataflowStepContext.class);
+ when(stepContext.timerInternals()).thenReturn(timerInternals);
+ DataflowStepContext userStepContext = mock(DataflowStepContext.class);
+ when(stepContext.namespacedToUser()).thenReturn(userStepContext);
+ when(stepContext.stateInternals()).thenReturn(stateInternals);
+ when(userStepContext.stateInternals()).thenReturn((StateInternals)
stateInternals);
+
+ DataflowExecutionContext<DataflowStepContext> executionContext =
+ mock(DataflowExecutionContext.class);
+ TestOperationContext operationContext =
TestOperationContext.create(counters);
+
when(executionContext.getStepContext(operationContext)).thenReturn(stepContext);
+ when(executionContext.getSideInputReader(any(), any(), any()))
+ .thenReturn(NullSideInputReader.empty());
+
+ ParDoFn parDoFn =
+ factory.create(
+ options,
+ cloudObject,
+ Collections.emptyList(),
+ MAIN_OUTPUT,
+ ImmutableMap.of(MAIN_OUTPUT, 0),
+ executionContext,
+ operationContext);
+
+ Receiver rcvr = new OutputReceiver();
+ parDoFn.startBundle(rcvr);
+
+ GlobalWindow globalWindow = GlobalWindow.INSTANCE;
+ parDoFn.processElement(
+ WindowedValue.of("foo", new Instant(1), globalWindow,
PaneInfo.NO_FIRING));
+
+ assertThat(
+ globalWindow.maxTimestamp().plus(allowedLateness),
+ greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ verify(stepContext)
+ .setStateCleanupTimer(
+ SimpleParDoFn.CLEANUP_TIMER_ID,
+ globalWindow,
+ GlobalWindow.Coder.INSTANCE,
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)));
+
+ StateNamespace globalWindowNamespace =
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE, globalWindow);
+ StateTag<ValueState<String>> tag =
+ StateTags.tagForSpec(
+ TestStatefulDoFnWithWindowExpiration.STATE_ID,
StateSpecs.value(StringUtf8Coder.of()));
+
+ when(userStepContext.getNextFiredTimer((Coder)
GlobalWindow.Coder.INSTANCE)).thenReturn(null);
+ when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE))
+ .thenReturn(
+ TimerData.of(
+ SimpleParDoFn.CLEANUP_TIMER_ID,
+ globalWindowNamespace,
+ BoundedWindow.TIMESTAMP_MAX_VALUE,
+ BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
+ TimeDomain.EVENT_TIME))
+ .thenReturn(null);
+
+ // Set up non-empty state. We don't mock + verify calls to clear() but
instead
+ // check that state is actually empty. We mustn't care how it is
accomplished.
+ stateInternals.state(globalWindowNamespace, tag).write("first");
+
+ // And this should clean up the second window
+ parDoFn.processTimers();
+
+ assertThat(stateInternals.state(globalWindowNamespace, tag).read(),
nullValue());
+ }
+
@Test
public void testCleanupWorks() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();