This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fa01615e520 Merge pull request #17359: [BEAM-14303] Add a way to
exclude output timestamp watermark holds
fa01615e520 is described below
commit fa01615e5207b53a7f73ff6a5ff55c336717c88f
Author: Reuven Lax <[email protected]>
AuthorDate: Thu May 5 14:23:48 2022 -0700
Merge pull request #17359: [BEAM-14303] Add a way to exclude output
timestamp watermark holds
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 170 +++++++++------------
.../beam/runners/core/SimpleDoFnRunnerTest.java | 10 +-
.../dataflow/worker/WindmillTimerInternals.java | 10 +-
.../main/java/org/apache/beam/sdk/state/Timer.java | 6 +
.../apache/beam/sdk/transforms/Deduplicate.java | 8 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 50 ++++++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 65 +++++---
.../bigquery/StorageApiWritesShardedRecords.java | 11 +-
8 files changed, 186 insertions(+), 144 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a73dd521f86..a7d67bf7526 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -246,6 +246,30 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
return sideInputReader.get(view, sideInputWindow);
}
+ @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users,
but must be respected
+ private void checkTimestamp(Instant elemTimestamp, Instant timestamp) {
+ Instant lowerBound;
+ try {
+ lowerBound = elemTimestamp.minus(fn.getAllowedTimestampSkew());
+ } catch (ArithmeticException e) {
+ lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+ if (timestamp.isBefore(lowerBound) ||
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot output with timestamp %s. Output timestamps must be no
earlier than the "
+ + "timestamp of the current input or timer (%s) minus the
allowed skew (%s) and no "
+ + "later than %s. See the DoFn#getAllowedTimestampSkew()
Javadoc for details "
+ + "on changing the allowed skew.",
+ timestamp,
+ elemTimestamp,
+ fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
+ ? fn.getAllowedTimestampSkew()
+ :
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+ BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
+
private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T>
windowedElem) {
checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag);
outputManager.output(tag, windowedElem);
@@ -389,7 +413,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
- checkTimestamp(timestamp);
+ checkTimestamp(elem.getTimestamp(), timestamp);
outputWithTimestamp(mainOutputTag, output, timestamp);
}
@@ -402,7 +426,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
- checkTimestamp(timestamp);
+ checkTimestamp(elem.getTimestamp(), timestamp);
outputWindowedValue(
tag, WindowedValue.of(output, timestamp, elem.getWindows(),
elem.getPane()));
}
@@ -416,30 +440,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
return elem.getWindows();
}
- @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users,
but must be respected
- private void checkTimestamp(Instant timestamp) {
- Instant lowerBound;
- try {
- lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
- } catch (ArithmeticException e) {
- lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
- if (timestamp.isBefore(lowerBound) ||
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot output with timestamp %s. Output timestamps must be no
earlier than the "
- + "timestamp of the current input (%s) minus the allowed
skew (%s) and no "
- + "later than %s. See the DoFn#getAllowedTimestampSkew()
Javadoc for details "
- + "on changing the allowed skew.",
- timestamp,
- elem.getTimestamp(),
- fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
- ? fn.getAllowedTimestampSkew()
- :
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
- BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
- }
-
@Override
public BoundedWindow window() {
return Iterables.getOnlyElement(elem.getWindows());
@@ -834,18 +834,19 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
- checkTimestamp(timestamp);
+ checkTimestamp(timestamp(), timestamp);
outputWithTimestamp(mainOutputTag, output, timestamp);
}
@Override
public <T> void output(TupleTag<T> tag, T output) {
+ checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(),
PaneInfo.NO_FIRING));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
- checkTimestamp(timestamp);
+ checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(),
PaneInfo.NO_FIRING));
}
@@ -854,30 +855,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
-
- @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users,
but must be respected
- private void checkTimestamp(Instant timestamp) {
- Instant lowerBound;
- try {
- lowerBound = timestamp().minus(fn.getAllowedTimestampSkew());
- } catch (ArithmeticException e) {
- lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
- if (timestamp.isBefore(lowerBound) ||
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot output with timestamp %s. Output timestamps must be no
earlier than the "
- + "output timestamp of the timer (%s) minus the allowed
skew (%s) and no "
- + "later than %s. See the DoFn#getAllowedTimestampSkew()
Javadoc for details "
- + "on changing the allowed skew.",
- timestamp,
- timestamp(),
- fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
- ? fn.getAllowedTimestampSkew()
- :
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
- BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
- }
}
/**
@@ -1064,17 +1041,19 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ checkTimestamp(this.timestamp, timestamp);
outputWithTimestamp(mainOutputTag, output, timestamp);
}
@Override
public <T> void output(TupleTag<T> tag, T output) {
+ checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(),
PaneInfo.NO_FIRING));
}
@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
- checkTimestamp(timestamp);
+ checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(),
PaneInfo.NO_FIRING));
}
@@ -1083,30 +1062,6 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
throw new UnsupportedOperationException(
"Bundle finalization is not supported in non-portable pipelines.");
}
-
- @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users,
but must be respected
- private void checkTimestamp(Instant timestamp) {
- Instant lowerBound;
- try {
- lowerBound = this.timestamp.minus(fn.getAllowedTimestampSkew());
- } catch (ArithmeticException e) {
- lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
- if (timestamp.isBefore(lowerBound) ||
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot output with timestamp %s. Output timestamps must be no
earlier than the "
- + "output timestamp of the window (%s) minus the allowed
skew (%s) and no "
- + "later than %s. See the DoFn#getAllowedTimestampSkew()
Javadoc for details "
- + "on changing the allowed skew.",
- timestamp,
- this.timestamp,
- fn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
- ? fn.getAllowedTimestampSkew()
- :
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
- BoundedWindow.TIMESTAMP_MAX_VALUE));
- }
- }
}
private class TimerInternalsTimer implements Timer {
@@ -1121,7 +1076,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
private final String timerFamilyId;
private final TimerSpec spec;
private Instant target;
- private Instant outputTimestamp;
+ private @Nullable Instant outputTimestamp;
+ private boolean noOutputTimestamp;
private final Instant elementInputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;
@@ -1138,6 +1094,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
this.timerId = timerId;
this.timerFamilyId = "";
this.spec = spec;
+ this.noOutputTimestamp = false;
this.elementInputTimestamp = elementInputTimestamp;
this.timerInternals = timerInternals;
}
@@ -1216,6 +1173,14 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
@Override
public Timer withOutputTimestamp(Instant outputTimestamp) {
this.outputTimestamp = outputTimestamp;
+ this.noOutputTimestamp = false;
+ return this;
+ }
+
+ @Override
+ public Timer withNoOutputTimestamp() {
+ this.outputTimestamp = null;
+ this.noOutputTimestamp = true;
return this;
}
@@ -1251,38 +1216,41 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
:
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
- } else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+ } else if (!noOutputTimestamp &&
TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
// The outputTimestamp was unset and this is a timer in the EVENT_TIME
domain. The output
// timestamp will be the firing timestamp.
outputTimestamp = target;
- } else {
+ } else if (!noOutputTimestamp) {
// The outputTimestamp was unset and this is a timer in the
PROCESSING_TIME
// (or SYNCHRONIZED_PROCESSING_TIME) domain. The output timestamp will
be the timestamp of
// the element (or timer) setting this timer.
outputTimestamp = elementInputTimestamp;
}
-
- Instant windowExpiry = LateDataUtils.garbageCollectionTime(window,
allowedLateness);
- if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
- checkArgument(
- !outputTimestamp.isAfter(windowExpiry),
- "Attempted to set an event-time timer with an output timestamp of
%s that is"
- + " after the expiration of window %s",
- outputTimestamp,
- windowExpiry);
- checkArgument(
- !target.isAfter(windowExpiry),
- "Attempted to set an event-time timer with a firing timestamp of
%s that is"
- + " after the expiration of window %s",
- target,
- windowExpiry);
+ if (outputTimestamp != null) {
+ Instant windowExpiry = LateDataUtils.garbageCollectionTime(window,
allowedLateness);
+ if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set an event-time timer with an output timestamp
of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ checkArgument(
+ !target.isAfter(windowExpiry),
+ "Attempted to set an event-time timer with a firing timestamp of
%s that is"
+ + " after the expiration of window %s",
+ target,
+ windowExpiry);
+ } else {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set a processing-time timer with an output
timestamp of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ }
} else {
- checkArgument(
- !outputTimestamp.isAfter(windowExpiry),
- "Attempted to set a processing-time timer with an output timestamp
of %s that is"
- + " after the expiration of window %s",
- outputTimestamp,
- windowExpiry);
+ outputTimestamp =
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
}
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 0cb53ed7b2f..51c6e1d83cf 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -318,7 +318,8 @@ public class SimpleDoFnRunnerTest {
allOf(
containsString("must be no earlier"),
containsString(
- String.format("timestamp of the current input (%s)", new
Instant(0).toString())),
+ String.format(
+ "timestamp of the current input or timer (%s)", new
Instant(0).toString())),
containsString(
String.format(
"the allowed skew (%s)",
@@ -369,7 +370,8 @@ public class SimpleDoFnRunnerTest {
allOf(
containsString("must be no earlier"),
containsString(
- String.format("timestamp of the current input (%s)", new
Instant(0).toString())),
+ String.format(
+ "timestamp of the current input or timer (%s)", new
Instant(0).toString())),
containsString(
String.format(
"the allowed skew (%s)",
@@ -626,7 +628,9 @@ public class SimpleDoFnRunnerTest {
exception.getMessage(),
allOf(
containsString("must be no earlier"),
- containsString(String.format("timestamp of the timer (%s)", new
Instant(0).toString())),
+ containsString(
+ String.format(
+ "timestamp of the current input or timer (%s)", new
Instant(0).toString())),
containsString(
String.format(
"the allowed skew (%s)",
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c732bdb1060..b0eb8674d99 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -31,6 +31,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.VarInt;
@@ -40,6 +41,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBase
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Cell;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
import org.joda.time.Instant;
/**
@@ -207,7 +209,11 @@ class WindmillTimerInternals implements TimerInternals {
if (cell.getValue()) {
// Setting the timer. If it is a user timer, set a hold.
- if (needsWatermarkHold(timerData)) {
+ // Only set a hold if it's needed and if the hold is before the end of
the global window.
+ if (needsWatermarkHold(timerData)
+ && timerData
+ .getOutputTimestamp()
+
.isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
@@ -220,6 +226,8 @@ class WindmillTimerInternals implements TimerInternals {
} else {
// Deleting a timer. If it is a user timer, clear the hold
timer.clearTimestamp();
+ // Clear the hold even if it's the end of the global window in order
to maintain update
+ // compatibility.
if (needsWatermarkHold(timerData)) {
// We are deleting timer; clear the hold
outputBuilder
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index 78453ee7c1b..efaf0154ef3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -99,6 +99,12 @@ public interface Timer {
*/
Timer withOutputTimestamp(Instant outputTime);
+ /**
+ * Asserts that there is no output timestamp. The output watermark will not
be held up, and it is
+ * illegal to output messages from this timer using the default output
timestamp.
+ */
+ Timer withNoOutputTimestamp();
+
/**
* Returns the current relative time used by {@link #setRelative()} and
{@link #offset}. This can
* be used by a client that self-manages relative timers (e.g. one that
stores the current timer
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
index e9fa0e2995f..bbbcb859aef 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -307,17 +306,14 @@ public final class Deduplicate {
@ProcessElement
public void processElement(
@Element KV<K, V> element,
- BoundedWindow window,
OutputReceiver<KV<K, V>> receiver,
@StateId(SEEN_STATE) ValueState<Boolean> seenState,
@TimerId(EXPIRY_TIMER) Timer expiryTimer) {
Boolean seen = seenState.read();
// Seen state is either set or not set so if it has been set then it
must be true.
if (seen == null) {
- // We don't want the expiry timer to hold up watermarks, so we set its
output timestamp to
- // the end of the
- // window.
-
expiryTimer.offset(duration).withOutputTimestamp(window.maxTimestamp()).setRelative();
+ // We don't want the expiry timer to hold up watermarks.
+ expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
seenState.write(true);
receiver.output(element);
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3207295a755..6cc943e1d88 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4336,6 +4336,56 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ public void testNoOutputTimestampDefaultBounded() throws Exception {
+ runTestNoOutputTimestampDefault(false);
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ public void testNoOutputTimestampDefaultStreaming() throws Exception {
+ runTestNoOutputTimestampDefault(false);
+ }
+
+ public void runTestNoOutputTimestampDefault(boolean useStreaming) throws
Exception {
+ final String timerId = "foo";
+ DoFn<KV<String, Long>, Long> fn1 =
+ new DoFn<KV<String, Long>, Long>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) {
+
timer.withNoOutputTimestamp().set(timestamp.plus(Duration.millis(10)));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(@Timestamp Instant timestamp,
OutputReceiver<Long> o) {
+ try {
+ o.output(timestamp.getMillis());
+ fail("Should have failed due to outputting when
noOutputTimestamp was set.");
+ } catch (IllegalArgumentException e) {
+ System.err.println("EXCEPTION " + e.getMessage() + " stack ");
+ e.printStackTrace();
+ Preconditions.checkState(e.getMessage().contains("Cannot
output with timestamp"));
+ }
+ }
+ };
+
+ if (useStreaming) {
+ pipeline.getOptions().as(StreamingOptions.class).setStreaming(true);
+ }
+ PCollection<Long> output =
+ pipeline
+ .apply(Create.timestamped(TimestampedValue.of(KV.of("hello",
1L), new Instant(3))))
+ .apply("first", ParDo.of(fn1));
+
+ pipeline.run();
+ }
+
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class})
public void testOutOfBoundsEventTimeTimerHold() throws Exception {
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index cf84027a7e9..2d3462eb269 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1774,7 +1774,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
private final BoundedWindow boundedWindow;
private final PaneInfo paneInfo;
- private Instant outputTimestamp;
+ private @Nullable Instant outputTimestamp;
+ private boolean noOutputTimestamp;
private Duration period = Duration.ZERO;
private Duration offset = Duration.ZERO;
@@ -1793,6 +1794,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
this.elementTimestampOrTimerHoldTimestamp =
elementTimestampOrTimerHoldTimestamp;
this.boundedWindow = boundedWindow;
this.paneInfo = paneInfo;
+ this.noOutputTimestamp = false;
this.timeDomain = timeDomain;
switch (timeDomain) {
@@ -1861,6 +1863,14 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
@Override
public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant
outputTime) {
this.outputTimestamp = outputTime;
+ this.noOutputTimestamp = false;
+ return this;
+ }
+
+ @Override
+ public org.apache.beam.sdk.state.Timer withNoOutputTimestamp() {
+ this.outputTimestamp = null;
+ this.noOutputTimestamp = true;
return this;
}
@@ -1914,40 +1924,45 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
}
// Output timestamp is set to the delivery time if not initialized by an
user.
- if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(timeDomain))
{
+ if (!noOutputTimestamp
+ && outputTimestamp == null
+ && TimeDomain.EVENT_TIME.equals(timeDomain)) {
outputTimestamp = scheduledTime;
}
// For processing timers
- if (outputTimestamp == null) {
+ if (!noOutputTimestamp && outputTimestamp == null) {
// For processing timers output timestamp will be:
// 1) timestamp of input element
// OR
// 2) hold timestamp of firing timer.
outputTimestamp = elementTimestampOrTimerHoldTimestamp;
}
-
- Instant windowExpiry =
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
- if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
- checkArgument(
- !outputTimestamp.isAfter(scheduledTime),
- "Attempted to set an event-time timer with an output timestamp of
%s that is"
- + " after the timer firing timestamp %s",
- outputTimestamp,
- scheduledTime);
- checkArgument(
- !scheduledTime.isAfter(windowExpiry),
- "Attempted to set an event-time timer with a firing timestamp of
%s that is"
- + " after the expiration of window %s",
- scheduledTime,
- windowExpiry);
+ if (outputTimestamp != null) {
+ Instant windowExpiry =
LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness);
+ if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+ checkArgument(
+ !outputTimestamp.isAfter(scheduledTime),
+ "Attempted to set an event-time timer with an output timestamp
of %s that is"
+ + " after the timer firing timestamp %s",
+ outputTimestamp,
+ scheduledTime);
+ checkArgument(
+ !scheduledTime.isAfter(windowExpiry),
+ "Attempted to set an event-time timer with a firing timestamp of
%s that is"
+ + " after the expiration of window %s",
+ scheduledTime,
+ windowExpiry);
+ } else {
+ checkArgument(
+ !outputTimestamp.isAfter(windowExpiry),
+ "Attempted to set a processing-time timer with an output
timestamp of %s that is"
+ + " after the expiration of window %s",
+ outputTimestamp,
+ windowExpiry);
+ }
} else {
- checkArgument(
- !outputTimestamp.isAfter(windowExpiry),
- "Attempted to set a processing-time timer with an output timestamp
of %s that is"
- + " after the expiration of window %s",
- outputTimestamp,
- windowExpiry);
+ outputTimestamp =
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
}
return Timer.of(
userKey,
@@ -2687,6 +2702,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
@Override
public void output(OutputT output) {
+ checkTimerTimestamp(currentTimer.getHoldTimestamp());
outputTo(
mainOutputConsumers,
WindowedValue.of(
@@ -2703,6 +2719,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
@Override
public <T> void output(TupleTag<T> tag, T output) {
+ checkTimerTimestamp(currentTimer.getHoldTimestamp());
Collection<FnDataReceiver<WindowedValue<T>>> consumers =
(Collection) localNameToConsumer.get(tag.getId());
if (consumers == null) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index c0f60f34c61..c9a070fbd8f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -68,7 +68,6 @@ import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
@@ -262,10 +261,7 @@ public class StorageApiWritesShardedRecords<DestinationT,
ElementT>
streamsCreated.inc();
}
// Reset the idle timer.
- streamIdleTimer
- .offset(streamIdleTime)
- .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
- .setRelative();
+
streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
return stream;
}
@@ -524,10 +520,7 @@ public class StorageApiWritesShardedRecords<DestinationT,
ElementT>
java.time.Duration timeElapsed = java.time.Duration.between(now,
Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
- idleTimer
- .offset(streamIdleTime)
- .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
- .setRelative();
+ idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
}
// called by the idleTimer and window-expiration handlers.