This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch revert-15056-timer_clear_and_watermark in repository https://gitbox.apache.org/repos/asf/beam.git
commit 280914177712c1dbf3bb558b248954ab1197c662 Author: reuvenlax <[email protected]> AuthorDate: Wed Jul 7 11:57:46 2021 -0700 Revert "Merge pull request #15056: [BEAM-10887] Timer clear" This reverts commit 2c6f42c596b6356f25454dc3b86055d2f40448d5. --- .../beam/runners/core/InMemoryTimerInternals.java | 3 +- .../apache/beam/runners/core/SimpleDoFnRunner.java | 5 - .../apache/beam/runners/core/TimerInternals.java | 20 +- .../beam/runners/direct/DirectTimerInternals.java | 58 +- .../direct/StatefulParDoEvaluatorFactory.java | 56 +- .../beam/runners/direct/WatermarkManager.java | 89 +-- .../wrappers/streaming/DoFnOperator.java | 4 +- .../streaming/ExecutableStageDoFnOperator.java | 3 +- runners/google-cloud-dataflow-java/build.gradle | 1 + .../worker/StreamingModeExecutionContext.java | 96 +-- .../dataflow/worker/WindmillTimerInternals.java | 39 +- .../beam/runners/samza/runtime/KeyedInternals.java | 5 +- .../samza/runtime/SamzaTimerInternalsFactory.java | 3 +- .../spark/stateful/SparkTimerInternals.java | 2 +- .../main/java/org/apache/beam/sdk/state/Timer.java | 3 - .../org/apache/beam/sdk/transforms/ParDoTest.java | 807 ++++++--------------- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 11 - .../beam/fn/harness/FnApiDoFnRunnerTest.java | 16 - 18 files changed, 308 insertions(+), 913 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 863a92b..d0b3bed 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -153,8 +153,7 @@ public class InMemoryTimerInternals implements TimerInternals { } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 0676915..b21252e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -1145,11 +1145,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public void clear() { - timerInternals.deleteTimer(namespace, timerId, timerFamilyId, spec.getTimeDomain()); - } - - @Override public Timer offset(Duration offset) { this.offset = offset; return this; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index f93a45a..05d8827 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -76,8 +76,7 @@ public interface TimerInternals { * manage timers for different time domains in very different ways, thus the {@link TimeDomain} is * a required parameter. */ - void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain); + void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain); /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ @Deprecated @@ -186,8 +185,6 @@ public interface TimerInternals { public abstract TimeDomain getDomain(); - public abstract boolean getDeleted(); - // When adding a new field, make sure to add it to the compareTo() method. /** Construct a {@link TimerData} for the given parameters. */ @@ -198,7 +195,7 @@ public interface TimerInternals { Instant outputTimestamp, TimeDomain domain) { return new AutoValue_TimerInternals_TimerData( - timerId, "", namespace, timestamp, outputTimestamp, domain, false); + timerId, "", namespace, timestamp, outputTimestamp, domain); } /** @@ -213,7 +210,7 @@ public interface TimerInternals { Instant outputTimestamp, TimeDomain domain) { return new AutoValue_TimerInternals_TimerData( - timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false); + timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain); } /** @@ -231,17 +228,6 @@ public interface TimerInternals { return of(timerId, namespace, timestamp, outputTimestamp, domain); } - public TimerData deleted() { - return new AutoValue_TimerInternals_TimerData( - getTimerId(), - getTimerFamilyId(), - getNamespace(), - getTimestamp(), - getOutputTimestamp(), - getDomain(), - true); - } - /** * {@inheritDoc}. * diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5a477bb..d240e1b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.direct; +import java.util.stream.StreamSupport; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -71,16 +71,8 @@ class DirectTimerInternals implements TimerInternals { } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { - deleteTimer( - TimerData.of( - timerId, - timerFamilyId, - namespace, - BoundedWindow.TIMESTAMP_MIN_VALUE, - BoundedWindow.TIMESTAMP_MAX_VALUE, - timeDomain)); + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); } /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */ @@ -101,19 +93,10 @@ class DirectTimerInternals implements TimerInternals { return timerUpdateBuilder.build(); } - public boolean containsUpdateForTimeBefore( - Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) { + public boolean containsUpdateForTimeBefore(Instant time) { TimerUpdate update = timerUpdateBuilder.build(); - return hasTimeBefore( - update.getSetTimers(), - maxWatermarkTime, - maxProcessingTime, - maxSynchronizedProcessingTime) - || hasTimeBefore( - update.getDeletedTimers(), - maxWatermarkTime, - maxProcessingTime, - maxSynchronizedProcessingTime); + return hasTimeBefore(update.getSetTimers(), time) + || hasTimeBefore(update.getDeletedTimers(), time); } @Override @@ -136,31 +119,8 @@ class DirectTimerInternals implements TimerInternals { return watermarks.getOutputWatermark(); } - private boolean hasTimeBefore( - Iterable<? extends TimerData> timers, - Instant maxWatermarkTime, - Instant maxProcessingTime, - Instant maxSynchronizedProcessingTime) { - for (TimerData timerData : timers) { - Instant currentTime; - switch (timerData.getDomain()) { - case EVENT_TIME: - currentTime = maxWatermarkTime; - break; - case PROCESSING_TIME: - currentTime = maxProcessingTime; - break; - case SYNCHRONIZED_PROCESSING_TIME: - currentTime = maxSynchronizedProcessingTime; - break; - default: - throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain()); - } - if (timerData.getTimestamp().isBefore(currentTime) - || timerData.getTimestamp().isEqual(currentTime)) { - return true; - } - } - return false; + private boolean hasTimeBefore(Iterable<? extends TimerData> timers, Instant time) { + return StreamSupport.stream(timers.spliterator(), false) + .anyMatch(td -> td.getTimestamp().isBefore(time)); } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index ebd305f..4044252 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; import org.joda.time.Instant; /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */ @@ -174,31 +173,14 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) { delegateEvaluator.processElement(windowedValue); } + + final Instant inputWatermarkTime = timerInternals.currentInputWatermarkTime(); PriorityQueue<TimerData> toBeFiredTimers = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp)); + gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add); - Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - for (TimerData timerData : gbkResult.getValue().timersIterable()) { - toBeFiredTimers.add(timerData); - switch (timerData.getDomain()) { - case EVENT_TIME: - maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp()); - break; - case PROCESSING_TIME: - maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp()); - break; - case SYNCHRONIZED_PROCESSING_TIME: - maxSynchronizedProcessingTime = - Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp()); - } - } - - while (!timerInternals.containsUpdateForTimeBefore( - maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime) + while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime) && !toBeFiredTimers.isEmpty()) { - TimerData timer = toBeFiredTimers.poll(); checkState( timer.getNamespace() instanceof WindowNamespace, @@ -210,23 +192,13 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo BoundedWindow timerWindow = windowNamespace.getWindow(); delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), timerWindow); - clearWatermarkHold(timer); - } - pushedBackTimers.addAll(toBeFiredTimers); - } - private void clearWatermarkHold(TimerData timer) { - StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer); - stepContext.stateInternals().state(timer.getNamespace(), timerWatermarkHoldTag).clear(); - stepContext.stateInternals().commit(); - } + StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer); - private void setWatermarkHold(TimerData timer) { - StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer); - stepContext - .stateInternals() - .state(timer.getNamespace(), timerWatermarkHoldTag) - .add(timer.getOutputTimestamp()); + stepContext.stateInternals().state(timer.getNamespace(), timerWatermarkHoldTag).clear(); + stepContext.stateInternals().commit(); + } + pushedBackTimers.addAll(toBeFiredTimers); } @Override @@ -235,12 +207,14 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle(); boolean isTimerDeclared = false; for (TimerData timerData : delegateResult.getTimerUpdate().getSetTimers()) { - setWatermarkHold(timerData); + StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timerData); + + stepContext + .stateInternals() + .state(timerData.getNamespace(), timerWatermarkHoldTag) + .add(timerData.getOutputTimestamp()); isTimerDeclared = true; } - for (TimerData timerData : delegateResult.getTimerUpdate().getDeletedTimers()) { - clearWatermarkHold(timerData); - } CopyOnAccessInMemoryStateInternals state; Instant watermarkHold; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index a043256..dbc0a96 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -45,6 +46,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.StateNamespace; @@ -64,10 +66,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset; @@ -635,7 +635,7 @@ class WatermarkManager<ExecutableT, CollectionT> { Table<StateNamespace, String, TimerData> existingTimersForKey = existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create()); - for (TimerData addedTimer : update.setTimers.values()) { + for (TimerData addedTimer : update.setTimers) { NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain()); if (timerQueue == null) { continue; @@ -659,7 +659,7 @@ class WatermarkManager<ExecutableT, CollectionT> { addedTimer); } - for (TimerData deletedTimer : update.deletedTimers.values()) { + for (TimerData deletedTimer : update.deletedTimers) { NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain()); if (timerQueue == null) { continue; @@ -670,6 +670,7 @@ class WatermarkManager<ExecutableT, CollectionT> { existingTimersForKey.get( deletedTimer.getNamespace(), deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId()); + if (existingTimer != null) { pendingTimers.remove(deletedTimer); timerQueue.remove(deletedTimer); @@ -1547,25 +1548,6 @@ class WatermarkManager<ExecutableT, CollectionT> { } } - @AutoValue - public abstract static class TimerKey { - abstract TimeDomain getDomain(); - - abstract String getId(); - - abstract String getFamily(); - - abstract Object getNamespace(); - - static TimerKey of(TimerData timerData) { - return new AutoValue_WatermarkManager_TimerKey( - timerData.getDomain(), - timerData.getTimerId(), - timerData.getTimerFamilyId(), - timerData.getNamespace().getCacheKey()); - } - } - /** * A collection of newly set, deleted, and completed timers. * @@ -1577,8 +1559,8 @@ class WatermarkManager<ExecutableT, CollectionT> { public static class TimerUpdate { private final StructuralKey<?> key; private final Iterable<? extends TimerData> completedTimers; - private final Map<TimerKey, ? extends TimerData> setTimers; - private final Map<TimerKey, ? extends TimerData> deletedTimers; + private final Iterable<? extends TimerData> setTimers; + private final Iterable<? extends TimerData> deletedTimers; private final Iterable<? extends TimerData> pushedBackTimers; /** Returns a TimerUpdate for a null key with no timers. */ @@ -1586,8 +1568,8 @@ class WatermarkManager<ExecutableT, CollectionT> { return new TimerUpdate( null, Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), Collections.emptyList()); } @@ -1605,14 +1587,14 @@ class WatermarkManager<ExecutableT, CollectionT> { public static final class TimerUpdateBuilder { private final StructuralKey<?> key; private final Collection<TimerData> completedTimers; - private final Map<TimerKey, TimerData> setTimers; - private final Map<TimerKey, TimerData> deletedTimers; + private final Collection<TimerData> setTimers; + private final Collection<TimerData> deletedTimers; private TimerUpdateBuilder(StructuralKey<?> key) { this.key = key; - this.completedTimers = Sets.newLinkedHashSet(); - this.setTimers = Maps.newLinkedHashMap(); - this.deletedTimers = Maps.newLinkedHashMap(); + this.completedTimers = new LinkedHashSet<>(); + this.setTimers = new LinkedHashSet<>(); + this.deletedTimers = new LinkedHashSet<>(); } /** @@ -1634,8 +1616,8 @@ class WatermarkManager<ExecutableT, CollectionT> { "Got a timer for after the end of time (%s), got %s", BoundedWindow.TIMESTAMP_MAX_VALUE, setTimer.getTimestamp()); - deletedTimers.remove(TimerKey.of(setTimer)); - setTimers.put(TimerKey.of(setTimer), setTimer); + deletedTimers.remove(setTimer); + setTimers.add(setTimer); return this; } @@ -1644,9 +1626,8 @@ class WatermarkManager<ExecutableT, CollectionT> { * it has previously been set. Returns this {@link TimerUpdateBuilder}. */ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { - TimerKey key = TimerKey.of(deletedTimer); - deletedTimers.put(key, deletedTimer); - setTimers.remove(key); + deletedTimers.add(deletedTimer); + setTimers.remove(deletedTimer); return this; } @@ -1658,12 +1639,19 @@ class WatermarkManager<ExecutableT, CollectionT> { return new TimerUpdate( key, ImmutableList.copyOf(completedTimers), - ImmutableMap.copyOf(setTimers), - ImmutableMap.copyOf(deletedTimers), + ImmutableList.copyOf(setTimers), + ImmutableList.copyOf(deletedTimers), Collections.emptyList()); } } + private static Map<String, TimerData> indexTimerData(Iterable<? extends TimerData> timerData) { + return StreamSupport.stream(timerData.spliterator(), false) + .collect( + Collectors.toMap( + TimerUpdate::getTimerIdAndTimerFamilyIdWithNamespace, e -> e, (a, b) -> b)); + } + private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData td) { return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId(); } @@ -1671,8 +1659,8 @@ class WatermarkManager<ExecutableT, CollectionT> { private TimerUpdate( StructuralKey<?> key, Iterable<? extends TimerData> completedTimers, - Map<TimerKey, ? extends TimerData> setTimers, - Map<TimerKey, ? extends TimerData> deletedTimers, + Iterable<? extends TimerData> setTimers, + Iterable<? extends TimerData> deletedTimers, Iterable<? extends TimerData> pushedBackTimers) { this.key = key; this.completedTimers = completedTimers; @@ -1693,12 +1681,12 @@ class WatermarkManager<ExecutableT, CollectionT> { @VisibleForTesting public Iterable<? extends TimerData> getSetTimers() { - return setTimers.values(); + return setTimers; } @VisibleForTesting public Iterable<? extends TimerData> getDeletedTimers() { - return deletedTimers.values(); + return deletedTimers; } Iterable<? extends TimerData> getPushedBackTimers() { @@ -1707,8 +1695,8 @@ class WatermarkManager<ExecutableT, CollectionT> { boolean isEmpty() { return Iterables.isEmpty(completedTimers) - && setTimers.isEmpty() - && deletedTimers.isEmpty() + && Iterables.isEmpty(setTimers) + && Iterables.isEmpty(deletedTimers) && Iterables.isEmpty(pushedBackTimers); } @@ -1720,18 +1708,17 @@ class WatermarkManager<ExecutableT, CollectionT> { public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) { List<TimerData> timersToComplete = new ArrayList<>(); Set<TimerData> pushedBack = Sets.newHashSet(pushedBackTimers); - Map<TimerKey, TimerData> newSetTimers = Maps.newLinkedHashMap(); - newSetTimers.putAll(setTimers); + Map<String, TimerData> newSetTimers = indexTimerData(setTimers); for (TimerData td : completedTimers) { - TimerKey timerKey = TimerKey.of(td); + String timerIdWithNs = getTimerIdAndTimerFamilyIdWithNamespace(td); if (!pushedBack.contains(td)) { timersToComplete.add(td); - } else if (!newSetTimers.containsKey(timerKey)) { - newSetTimers.put(timerKey, td); + } else if (!newSetTimers.containsKey(timerIdWithNs)) { + newSetTimers.put(timerIdWithNs, td); } } return new TimerUpdate( - key, timersToComplete, newSetTimers, deletedTimers, Collections.emptyList()); + key, timersToComplete, newSetTimers.values(), deletedTimers, Collections.emptyList()); } /** diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 60abddd..7f3dc0b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -1530,8 +1530,7 @@ public class DoFnOperator<InputT, OutputT> } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { try { cancelPendingTimerById(getContextTimerId(timerId, namespace)); } catch (Exception e) { @@ -1546,7 +1545,6 @@ public class DoFnOperator<InputT, OutputT> deleteTimer( timer.getNamespace(), constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), - timer.getTimerFamilyId(), timer.getDomain()); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index 0ce771b..47b8aec 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -596,8 +596,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { throw new UnsupportedOperationException( "It is not expected to use SdfFlinkTimerInternals to delete a timer"); } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index d03ff2b..e01e441 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -319,6 +319,7 @@ task validatesRunnerStreaming { 'org.apache.beam.sdk.testing.UsesMapState', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', 'org.apache.beam.sdk.testing.UsesSetState', + 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering', ], )) } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 2350b81..1aed4bb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -29,13 +29,11 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; @@ -63,9 +61,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Cell; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -545,7 +540,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step synchronizedProcessingTime); this.cachedFiredTimers = null; - this.orderedUserTimers = null; + this.cachedFiredUserTimers = null; } public void flushState() { @@ -586,81 +581,30 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step } // Lazily initialized - private NavigableSet<TimerData> orderedUserTimers = null; - private Set<String> deletedTimers = null; + private Iterator<TimerData> cachedFiredUserTimers = null; public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) { - if (orderedUserTimers == null) { - orderedUserTimers = Sets.newTreeSet(); - deletedTimers = Sets.newHashSet(); - FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers()) - .filter( - timer -> - WindmillTimerInternals.isUserTimer(timer) - && timer.getStateFamily().equals(stateFamily)) - .transform( - timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) - .iterator() - .forEachRemaining( - timer -> { - orderedUserTimers.add(timer); - }); - } - - // Extract recently set or deleted timers. This operation is destructive, meaning that each - // call returns the modifications since the last call. - Table<String, StateNamespace, TimerData> justModifiedTimers = - userTimerInternals.extractJustModifiedTimers(); - for (Cell<String, StateNamespace, TimerData> cell : justModifiedTimers.cellSet()) { - Instant currentMaxTime; - switch (cell.getValue().getDomain()) { - case EVENT_TIME: - currentMaxTime = userTimerInternals.currentInputWatermarkTime(); - break; - case PROCESSING_TIME: - currentMaxTime = userTimerInternals.currentProcessingTime(); - break; - case SYNCHRONIZED_PROCESSING_TIME: - currentMaxTime = userTimerInternals.currentSynchronizedProcessingTime(); - break; - default: - throw new RuntimeException("Unexpected domain " + cell.getValue().getDomain()); - } - // If the the modified timer falls within the range of timers eligible to fire, insert - // it into the priority queue. If it falls outside the range, then don't: If it's a - // brand-new timer, it won't - // affect order in the current bundle. If it's a reset or clear of an existing timer, we - // will detect this below - // before we fire the timer. - if (cell.getValue().getTimestamp().isBefore(currentMaxTime) - || cell.getValue().getTimestamp().isEqual(currentMaxTime)) { - if (!cell.getValue().getDeleted()) { - orderedUserTimers.add(cell.getValue()); - } - } + if (cachedFiredUserTimers == null) { + cachedFiredUserTimers = + FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers()) + .filter( + timer -> + WindmillTimerInternals.isUserTimer(timer) + && timer.getStateFamily().equals(stateFamily)) + .transform( + timer -> + WindmillTimerInternals.windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder)) + .iterator(); } - while (!orderedUserTimers.isEmpty()) { - TimerData nextTimer = orderedUserTimers.pollFirst(); - // If the timer for this key is in justModifiedTimers, ignore the old value. The new value - // for this timer will be elsewhere in the priority queue. - @Nullable - TimerData updatedTimer = - justModifiedTimers.get( - WindmillTimerInternals.getTimerDataKey(nextTimer), nextTimer.getNamespace()); - if (updatedTimer == null || updatedTimer.equals(nextTimer)) { - // User timers must be explicitly deleted when delivered, to release the implied hold. - // This will also add the deletion to the next call to - // WindmillTimerInternals.extractJustModifiedTimers, - // which will prevent the timer from firing if an old value for the timer was in the input - // bundle. - userTimerInternals.deleteTimer(nextTimer); - return nextTimer; - } + if (!cachedFiredUserTimers.hasNext()) { + return null; } - return null; + TimerData nextTimer = cachedFiredUserTimers.next(); + // User timers must be explicitly deleted when delivered, to release the implied hold + userTimerInternals.deleteTimer(nextTimer); + return nextTimer; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 015fa6d..3308f9f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -59,8 +59,6 @@ class WindmillTimerInternals implements TimerInternals { // across namespaces. private Table<String, StateNamespace, TimerData> timers = HashBasedTable.create(); - private Table<String, StateNamespace, TimerData> recentlyModifiedTimers = HashBasedTable.create(); - // Map from timer id to whether it is to be deleted or set private Table<String, StateNamespace, Boolean> timerStillPresent = HashBasedTable.create(); @@ -96,22 +94,12 @@ class WindmillTimerInternals implements TimerInternals { synchronizedProcessingTime); } - public Table<String, StateNamespace, TimerData> extractJustModifiedTimers() { - Table<String, StateNamespace, TimerData> justModified = recentlyModifiedTimers; - recentlyModifiedTimers = HashBasedTable.create(); - return justModified; - } - @Override public void setTimer(TimerData timerKey) { timers.put( getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), timerKey.getNamespace(), timerKey); - recentlyModifiedTimers.put( - getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), - timerKey.getNamespace(), - timerKey); timerStillPresent.put( getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), timerKey.getNamespace(), @@ -130,18 +118,10 @@ class WindmillTimerInternals implements TimerInternals { getTimerDataKey(timerId, timerFamilyId), namespace, TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain)); - recentlyModifiedTimers.put( - getTimerDataKey(timerId, timerFamilyId), - namespace, - TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain)); timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, true); } - public static String getTimerDataKey(TimerData timerData) { - return getTimerDataKey(timerData.getTimerId(), timerData.getTimerFamilyId()); - } - - private static String getTimerDataKey(String timerId, String timerFamilyId) { + private String getTimerDataKey(String timerId, String timerFamilyId) { // Identifies timer uniquely with timerFamilyId return timerId + '+' + timerFamilyId; } @@ -152,11 +132,6 @@ class WindmillTimerInternals implements TimerInternals { getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), timerKey.getNamespace(), timerKey); - recentlyModifiedTimers.put( - getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), - timerKey.getNamespace(), - timerKey.deleted()); - timerStillPresent.put( getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()), timerKey.getNamespace(), @@ -169,16 +144,8 @@ class WindmillTimerInternals implements TimerInternals { } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { - deleteTimer( - TimerData.of( - timerId, - timerFamilyId, - namespace, - BoundedWindow.TIMESTAMP_MIN_VALUE, - BoundedWindow.TIMESTAMP_MAX_VALUE, - timeDomain)); + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Deletion of timers by ID is not supported."); } @Override diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java index 9a8d852..2ac97d3 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java @@ -138,9 +138,8 @@ class KeyedInternals<K> { } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { - getInternals().deleteTimer(namespace, timerId, timerFamilyId, timeDomain); + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + getInternals().deleteTimer(namespace, timerId, timeDomain); } @Override diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java index 4b34a25..d692851 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java @@ -318,8 +318,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> { } @Override - public void deleteTimer( - StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { Instant now = Instant.now(); deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain)); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index d22fe8e..b726c23 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -120,7 +120,7 @@ public class SparkTimerInternals implements TimerInternals { } @Override - public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported."); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java index 5f1c047..437df4d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java @@ -81,9 +81,6 @@ public interface Timer { */ void setRelative(); - /** Clears a timer. Previous set timers will become unset. */ - void clear(); - /** Offsets the target timestamp used by {@link #setRelative()} by the given duration. */ Timer offset(Duration offset); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index e0ed7ad..e022b14 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -50,6 +50,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -58,7 +59,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.AtomicCoder; @@ -137,6 +140,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -3451,7 +3455,6 @@ public class ParDoTest implements Serializable { /** Tests to validate ParDo timers. */ @RunWith(JUnit4.class) public static class TimerTests extends SharedTestBase implements Serializable { - @Test public void testTimerNotKeyed() { final String timerId = "foo"; @@ -4277,217 +4280,206 @@ public class ParDoTest implements Serializable { } /** A test makes sure that an event time timers are correctly ordered. */ - // @Test - // @Category({ - // ValidatesRunner.class, - // UsesTimersInParDo.class, - // UsesTestStream.class, - // UsesStatefulParDo.class, - // UsesStrictTimerOrdering.class - // }) - // public void testEventTimeTimerOrdering() throws Exception { - // final int numTestElements = 100; - // final Instant now = new Instant(1500000000000L); - // TestStream.Builder<KV<String, String>> builder = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) - // .advanceWatermarkTo(new Instant(0)); - // - // for (int i = 0; i < numTestElements; i++) { - // builder = - // builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * - // 1000))); - // if ((i + 1) % 10 == 0) { - // builder = builder.advanceWatermarkTo(now.plus((i + 1) * 1000)); - // } - // } - // - // testEventTimeTimerOrderingWithInputPTransform( - // now, numTestElements, builder.advanceWatermarkToInfinity()); - // } - // - // /** A test makes sure that an event time timers are correctly ordered using Create - // transform. */ - // @Test - // @Category({ - // ValidatesRunner.class, - // UsesTimersInParDo.class, - // UsesStatefulParDo.class, - // UsesStrictTimerOrdering.class - // }) - // public void testEventTimeTimerOrderingWithCreate() throws Exception { - // final int numTestElements = 100; - // final Instant now = new Instant(1500000000000L); - // - // List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>(); - // for (int i = 0; i < numTestElements; i++) { - // elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000))); - // } - // - // testEventTimeTimerOrderingWithInputPTransform( - // now, numTestElements, Create.timestamped(elements)); - // } - // - // private void testEventTimeTimerOrderingWithInputPTransform( - // Instant now, - // int numTestElements, - // PTransform<PBegin, PCollection<KV<String, String>>> transform) - // throws Exception { - // - // final String timerIdBagAppend = "append"; - // final String timerIdGc = "gc"; - // final String bag = "bag"; - // final String minTimestamp = "minTs"; - // final Instant gcTimerStamp = now.plus((numTestElements + 1) * 1000); - // - // DoFn<KV<String, String>, String> fn = - // new DoFn<KV<String, String>, String>() { - // - // @TimerId(timerIdBagAppend) - // private final TimerSpec appendSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @TimerId(timerIdGc) - // private final TimerSpec gcSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @StateId(bag) - // private final StateSpec<BagState<TimestampedValue<String>>> bagStateSpec = - // StateSpecs.bag(); - // - // @StateId(minTimestamp) - // private final StateSpec<ValueState<Instant>> minTimestampSpec = - // StateSpecs.value(); - // - // @ProcessElement - // public void processElement( - // ProcessContext context, - // @TimerId(timerIdBagAppend) Timer bagTimer, - // @TimerId(timerIdGc) Timer gcTimer, - // @StateId(bag) BagState<TimestampedValue<String>> bagState, - // @StateId(minTimestamp) ValueState<Instant> minStampState) { - // - // Instant currentMinStamp = - // MoreObjects.firstNonNull(minStampState.read(), - // BoundedWindow.TIMESTAMP_MAX_VALUE); - // if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - // gcTimer.set(gcTimerStamp); - // } - // if (currentMinStamp.isAfter(context.timestamp())) { - // minStampState.write(context.timestamp()); - // bagTimer.set(context.timestamp()); - // } - // bagState.add(TimestampedValue.of(context.element().getValue(), - // context.timestamp())); - // } - // - // @OnTimer(timerIdBagAppend) - // public void onTimer( - // OnTimerContext context, - // @TimerId(timerIdBagAppend) Timer timer, - // @StateId(bag) BagState<TimestampedValue<String>> bagState) { - // - // List<TimestampedValue<String>> flush = new ArrayList<>(); - // Instant flushTime = context.timestamp(); - // for (TimestampedValue<String> val : bagState.read()) { - // if (!val.getTimestamp().isAfter(flushTime)) { - // flush.add(val); - // } - // } - // flush.sort(Comparator.comparing(TimestampedValue::getTimestamp)); - // context.output( - // - // Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); - // Instant newMinStamp = flushTime.plus(1000); - // if (flush.size() < numTestElements) { - // timer.set(newMinStamp); - // } - // } - // - // @OnTimer(timerIdGc) - // public void onTimer( - // OnTimerContext context, @StateId(bag) BagState<TimestampedValue<String>> - // bagState) { - // - // String output = - // Joiner.on(":") - // .join( - // StreamSupport.stream(bagState.read().spliterator(), false) - // - // .sorted(Comparator.comparing(TimestampedValue::getTimestamp)) - // .map(TimestampedValue::getValue) - // .iterator()) - // + ":cleanup"; - // context.output(output); - // bagState.clear(); - // } - // }; - // - // PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn)); - // List<String> expected = - // IntStream.rangeClosed(0, numTestElements) - // .mapToObj(expandFn(numTestElements)) - // .collect(Collectors.toList()); - // PAssert.that(output).containsInAnyOrder(expected); - // pipeline.run(); - // } - // - // private IntFunction<String> expandFn(int numTestElements) { - // return i -> - // Joiner.on(":") - // .join( - // IntStream.rangeClosed(0, Math.min(numTestElements - 1, i)) - // .mapToObj(String::valueOf) - // .iterator()) - // + (i == numTestElements ? ":cleanup" : ""); - // } - - // @Test - // @Category({ValidatesRunner.class, UsesTimersInParDo.class}) - // public void testPipelineOptionsParameterOnTimer() { - // final String timerId = "thisTimer"; - // - // PCollection<String> results = - // pipeline - // .apply(Create.of(KV.of(0L, 0L))) - // .apply( - // ParDo.of( - // new DoFn<KV<Long, Long>, String>() { - // @TimerId(timerId) - // private final TimerSpec spec = - // TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @ProcessElement - // public void process( - // ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) - // { - // timer.set(w.maxTimestamp()); - // } - // - // @OnTimer(timerId) - // public void onTimer(OutputReceiver<String> r, PipelineOptions options) - // { - // r.output(options.as(MyOptions.class).getFakeOption()); - // } - // })); - // - // String testOptionValue = "not fake anymore"; - // pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); - // PAssert.that(results).containsInAnyOrder("not fake anymore"); - // - // pipeline.run(); - // } - // - // @Test - // @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void duplicateTimerSetting() { - // TestStream<KV<String, String>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) - // .addElements(KV.of("key1", "v1")) - // .advanceWatermarkToInfinity(); - // - // PCollection<String> result = pipeline.apply(stream).apply(ParDo.of(new TwoTimerDoFn())); - // PAssert.that(result).containsInAnyOrder("It works"); - // - // pipeline.run().waitUntilFinish(); - // } + @Test + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesTestStream.class, + UsesStatefulParDo.class, + UsesStrictTimerOrdering.class + }) + public void testEventTimeTimerOrdering() throws Exception { + final int numTestElements = 100; + final Instant now = new Instant(1500000000000L); + TestStream.Builder<KV<String, String>> builder = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .advanceWatermarkTo(new Instant(0)); + + for (int i = 0; i < numTestElements; i++) { + builder = + builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000))); + if ((i + 1) % 10 == 0) { + builder = builder.advanceWatermarkTo(now.plus((i + 1) * 1000)); + } + } + + testEventTimeTimerOrderingWithInputPTransform( + now, numTestElements, builder.advanceWatermarkToInfinity()); + } + + /** A test makes sure that an event time timers are correctly ordered using Create transform. */ + @Test + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesStatefulParDo.class, + UsesStrictTimerOrdering.class + }) + public void testEventTimeTimerOrderingWithCreate() throws Exception { + final int numTestElements = 100; + final Instant now = new Instant(1500000000000L); + + List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>(); + for (int i = 0; i < numTestElements; i++) { + elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000))); + } + + testEventTimeTimerOrderingWithInputPTransform( + now, numTestElements, Create.timestamped(elements)); + } + + private void testEventTimeTimerOrderingWithInputPTransform( + Instant now, + int numTestElements, + PTransform<PBegin, PCollection<KV<String, String>>> transform) + throws Exception { + + final String timerIdBagAppend = "append"; + final String timerIdGc = "gc"; + final String bag = "bag"; + final String minTimestamp = "minTs"; + final Instant gcTimerStamp = now.plus((numTestElements + 1) * 1000); + + DoFn<KV<String, String>, String> fn = + new DoFn<KV<String, String>, String>() { + + @TimerId(timerIdBagAppend) + private final TimerSpec appendSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(timerIdGc) + private final TimerSpec gcSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId(bag) + private final StateSpec<BagState<TimestampedValue<String>>> bagStateSpec = + StateSpecs.bag(); + + @StateId(minTimestamp) + private final StateSpec<ValueState<Instant>> minTimestampSpec = StateSpecs.value(); + + @ProcessElement + public void processElement( + ProcessContext context, + @TimerId(timerIdBagAppend) Timer bagTimer, + @TimerId(timerIdGc) Timer gcTimer, + @StateId(bag) BagState<TimestampedValue<String>> bagState, + @StateId(minTimestamp) ValueState<Instant> minStampState) { + + Instant currentMinStamp = + MoreObjects.firstNonNull(minStampState.read(), BoundedWindow.TIMESTAMP_MAX_VALUE); + if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + gcTimer.set(gcTimerStamp); + } + if (currentMinStamp.isAfter(context.timestamp())) { + minStampState.write(context.timestamp()); + bagTimer.set(context.timestamp()); + } + bagState.add(TimestampedValue.of(context.element().getValue(), context.timestamp())); + } + + @OnTimer(timerIdBagAppend) + public void onTimer( + OnTimerContext context, + @TimerId(timerIdBagAppend) Timer timer, + @StateId(bag) BagState<TimestampedValue<String>> bagState) { + + List<TimestampedValue<String>> flush = new ArrayList<>(); + Instant flushTime = context.timestamp(); + for (TimestampedValue<String> val : bagState.read()) { + if (!val.getTimestamp().isAfter(flushTime)) { + flush.add(val); + } + } + flush.sort(Comparator.comparing(TimestampedValue::getTimestamp)); + context.output( + Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator())); + Instant newMinStamp = flushTime.plus(1000); + if (flush.size() < numTestElements) { + timer.set(newMinStamp); + } + } + + @OnTimer(timerIdGc) + public void onTimer( + OnTimerContext context, @StateId(bag) BagState<TimestampedValue<String>> bagState) { + + String output = + Joiner.on(":") + .join( + StreamSupport.stream(bagState.read().spliterator(), false) + .sorted(Comparator.comparing(TimestampedValue::getTimestamp)) + .map(TimestampedValue::getValue) + .iterator()) + + ":cleanup"; + context.output(output); + bagState.clear(); + } + }; + + PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn)); + List<String> expected = + IntStream.rangeClosed(0, numTestElements) + .mapToObj(expandFn(numTestElements)) + .collect(Collectors.toList()); + PAssert.that(output).containsInAnyOrder(expected); + pipeline.run(); + } + + private IntFunction<String> expandFn(int numTestElements) { + return i -> + Joiner.on(":") + .join( + IntStream.rangeClosed(0, Math.min(numTestElements - 1, i)) + .mapToObj(String::valueOf) + .iterator()) + + (i == numTestElements ? ":cleanup" : ""); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class}) + public void testPipelineOptionsParameterOnTimer() { + final String timerId = "thisTimer"; + + PCollection<String> results = + pipeline + .apply(Create.of(KV.of(0L, 0L))) + .apply( + ParDo.of( + new DoFn<KV<Long, Long>, String>() { + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process( + ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) { + timer.set(w.maxTimestamp()); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<String> r, PipelineOptions options) { + r.output(options.as(MyOptions.class).getFakeOption()); + } + })); + + String testOptionValue = "not fake anymore"; + pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue); + PAssert.that(results).containsInAnyOrder("not fake anymore"); + + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void duplicateTimerSetting() { + TestStream<KV<String, String>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())) + .addElements(KV.of("key1", "v1")) + .advanceWatermarkToInfinity(); + + PCollection<String> result = pipeline.apply(stream).apply(ParDo.of(new TwoTimerDoFn())); + PAssert.that(result).containsInAnyOrder("It works"); + + pipeline.run().waitUntilFinish(); + } @Test @Category({ @@ -4496,48 +4488,23 @@ public class ParDoTest implements Serializable { UsesTestStream.class, UsesStrictTimerOrdering.class }) - public void testTwoTimersSettingEachOtherBounded() { - testTwoTimersSettingEachOther(IsBounded.BOUNDED); - } - - // @Test - // @Category({ - // ValidatesRunner.class, - // UsesTimersInParDo.class, - // UsesTestStream.class, - // UsesStrictTimerOrdering.class - // }) - // public void testTwoTimersSettingEachOtherUnbounded() { - // testTwoTimersSettingEachOther(IsBounded.UNBOUNDED); - // } - - private void testTwoTimersSettingEachOther(IsBounded isBounded) { + public void testTwoTimersSettingEachOther() { Instant now = new Instant(1500000000000L); Instant end = now.plus(100); TestStream<KV<Void, Void>> input = TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of())) .addElements(KV.of(null, null)) .advanceWatermarkToInfinity(); - pipeline.apply(TwoTimerTest.of(now, end, input, isBounded)); + pipeline.apply(TwoTimerTest.of(now, end, input)); pipeline.run(); } - // @Test - // @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) - // public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() { - // testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded.BOUNDED);; - // } - @Test @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class}) - public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() { - testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded.UNBOUNDED); - } - - private void testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded isBounded) { + public void testTwoTimersSettingEachOtherWithCreateAsInput() { Instant now = new Instant(1500000000000L); Instant end = now.plus(100); - pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), isBounded)); + pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)))); pipeline.run(); } @@ -4787,27 +4754,19 @@ public class ParDoTest implements Serializable { private static class TwoTimerTest extends PTransform<PBegin, PDone> { private static PTransform<PBegin, PDone> of( - Instant start, - Instant end, - PTransform<PBegin, PCollection<KV<Void, Void>>> input, - IsBounded isBounded) { - return new TwoTimerTest(start, end, input, isBounded); + Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) { + return new TwoTimerTest(start, end, input); } private final Instant start; private final Instant end; private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> inputPTransform; - private IsBounded isBounded; public TwoTimerTest( - Instant start, - Instant end, - PTransform<PBegin, PCollection<KV<Void, Void>>> input, - IsBounded isBounded) { + Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) { this.start = start; this.end = end; this.inputPTransform = input; - this.isBounded = isBounded; } @Override @@ -4819,7 +4778,6 @@ public class ParDoTest implements Serializable { PCollection<String> result = input .apply(inputPTransform) - .setIsBoundedInternal(isBounded) .apply( ParDo.of( new DoFn<KV<Void, Void>, String>() { @@ -4839,6 +4797,7 @@ public class ParDoTest implements Serializable { @TimerId(timerName1) Timer t1, @TimerId(timerName2) Timer t2, @StateId(countStateName) ValueState<Integer> state) { + state.write(0); t1.set(start); // set the t2 timer after end, so that we test that @@ -4892,348 +4851,6 @@ public class ParDoTest implements Serializable { return PDone.in(input.getPipeline()); } } - - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testSetAndClearProcessingTimeTimer() { - // - // final String timerId = "processing-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - // - // @ProcessElement - // public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> - // r) { - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // timer.clear(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .addElements(KV.of("hello", 37)) - // .advanceProcessingTime( - // Duration.millis( - // DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds - // .plus(Duration.standardMinutes(2))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testSetAndClearEventTimeTimer() { - // final String timerId = "event-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @ProcessElement - // public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> - // r) { - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // timer.clear(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .advanceWatermarkTo(new Instant(0)) - // .addElements(KV.of("hello", 37)) - // .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testClearUnsetProcessingTimeTimer() { - // final String timerId = "processing-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - // - // @ProcessElement - // public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> - // r) { - // timer.clear(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .addElements(KV.of("hello", 37)) - // .advanceProcessingTime( - // Duration.millis( - // DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds - // .plus(Duration.standardMinutes(4))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testClearUnsetEventTimeTimer() { - // final String timerId = "event-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @ProcessElement - // public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> - // r) { - // timer.clear(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .advanceWatermarkTo(new Instant(0)) - // .addElements(KV.of("hello", 37)) - // .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testClearProcessingTimeTimer() { - // final String timerId = "processing-timer"; - // final String clearTimerId = "clear-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - // - // @TimerId(clearTimerId) - // private final TimerSpec clearTimerSpec = - // TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - // - // @ProcessElement - // public void processElement( - // @TimerId(timerId) Timer timer, - // @TimerId(clearTimerId) Timer clearTimer, - // OutputReceiver<Integer> r) { - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // clearTimer.offset(Duration.standardSeconds(2)).setRelative(); - // - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer( - // OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer clearTimer) { - // System.err.println("onTimer"); - // r.output(42); - // clearTimer.clear(); - // } - // - // // This should never fire since we clear the timer in the earlier timer. - // @OnTimer(clearTimerId) - // public void clearTimer(OutputReceiver<Integer> r) { - // System.err.println("clearTimer"); - // r.output(43); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .addElements(KV.of("hello", 37)) - // .advanceProcessingTime( - // Duration.millis( - // DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds - // .plus(Duration.standardMinutes(2))) - // .advanceProcessingTime( - // Duration.millis( - // DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds - // .plus(Duration.standardMinutes(4))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3, 42); - // pipeline.run(); - // } - - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testClearEventTimeTimer() { - // final String timerId = "event-timer"; - // final String clearTimerId = "clear-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @TimerId(clearTimerId) - // private final TimerSpec clearSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @ProcessElement - // public void processElement( - // @TimerId(timerId) Timer timer, - // @TimerId(clearTimerId) Timer clearTimer, - // OutputReceiver<Integer> r) { - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // clearTimer.offset(Duration.standardSeconds(2)).setRelative(); - // - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer( - // OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer clearTimer) { - // r.output(42); - // clearTimer.clear(); - // } - // - // // This should never fire since we clear the timer in the earlier timer. - // @OnTimer(clearTimerId) - // public void clearTimer(OutputReceiver<Integer> r) { - // r.output(43); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .advanceWatermarkTo(new Instant(0)) - // .addElements(KV.of("hello", 37)) - // .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3, 42); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testSetProcessingTimerAfterClear() { - // final String timerId = "processing-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - // - // @ProcessElement - // public void processElement( - // @Element KV<String, Integer> e, - // @TimerId(timerId) Timer timer, - // OutputReceiver<Integer> r) { - // timer.clear(); - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .addElements(KV.of("hello", 37), KV.of("hello", 38)) - // .advanceProcessingTime( - // Duration.millis( - // DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds - // .plus(Duration.standardMinutes(2))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3, 3, 42); - // pipeline.run(); - // } - // - // @Test - // @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) - // public void testSetEventTimerAfterClear() { - // final String timerId = "event-timer"; - // - // DoFn<KV<String, Integer>, Integer> fn = - // new DoFn<KV<String, Integer>, Integer>() { - // - // @TimerId(timerId) - // private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - // - // @ProcessElement - // public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> - // r) { - // timer.clear(); - // timer.offset(Duration.standardSeconds(1)).setRelative(); - // r.output(3); - // } - // - // @OnTimer(timerId) - // public void onTimer(OutputReceiver<Integer> r) { - // r.output(42); - // } - // }; - // - // TestStream<KV<String, Integer>> stream = - // TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) - // .advanceWatermarkTo(new Instant(0)) - // .addElements(KV.of("hello", 37), KV.of("hello", 38)) - // .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) - // .advanceWatermarkToInfinity(); - // - // PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); - // PAssert.that(output).containsInAnyOrder(3, 3, 42); - // pipeline.run(); - // } } /** Tests validating Timer coder inference behaviors. */ diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 52485db..4711c4c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1781,17 +1781,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator } @Override - public void clear() { - TimerHandler<K> consumer = (TimerHandler) timerHandlers.get(timerId); - try { - consumer.accept( - Timer.cleared(userKey, dynamicTimerTag, Collections.singletonList(boundedWindow))); - } catch (Throwable t) { - throw UserCodeException.wrap(t); - } - } - - @Override public org.apache.beam.sdk.state.Timer offset(Duration offset) { this.offset = offset; return this; diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 0d58619..1f2d839 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -1098,13 +1098,9 @@ public class FnApiDoFnRunnerTest implements Serializable { fakeTimerClient.getTimers(eventTimer), contains( timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)), - clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)), - clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)), - clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)), - clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("A", new Instant(1400L), new Instant(2411L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(2511L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(2611L)), @@ -1117,13 +1113,9 @@ public class FnApiDoFnRunnerTest implements Serializable { fakeTimerClient.getTimers(processingTimer), contains( timerInGlobalWindow("X", new Instant(1000L), new Instant(10002L)), - clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1100L), new Instant(10002L)), - clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("X", new Instant(1200L), new Instant(10002L)), - clearedTimerInGlobalWindow("X"), timerInGlobalWindow("Y", new Instant(1300L), new Instant(10002L)), - clearedTimerInGlobalWindow("Y"), timerInGlobalWindow("A", new Instant(1400L), new Instant(10012L)), timerInGlobalWindow("B", new Instant(1500L), new Instant(10012L)), timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), @@ -1224,12 +1216,6 @@ public class FnApiDoFnRunnerTest implements Serializable { return dynamicTimerInGlobalWindow(userKey, "", holdTimestamp, fireTimestamp); } - private <K> org.apache.beam.runners.core.construction.Timer<K> clearedTimerInGlobalWindow( - K userKey) { - return org.apache.beam.runners.core.construction.Timer.cleared( - userKey, "", Collections.singletonList(GlobalWindow.INSTANCE)); - } - private <K> org.apache.beam.runners.core.construction.Timer<K> dynamicTimerInGlobalWindow( K userKey, String dynamicTimerTag, Instant holdTimestamp, Instant fireTimestamp) { return org.apache.beam.runners.core.construction.Timer.of( @@ -1279,10 +1265,8 @@ public class FnApiDoFnRunnerTest implements Serializable { bagState.add(context.element().getValue()); eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus(1L)); - eventTimeTimer.clear(); processingTimeTimer.offset(Duration.millis(2L)); processingTimeTimer.setRelative(); - processingTimeTimer.clear(); eventTimerFamily .get("event-timer1") .withOutputTimestamp(context.timestamp())
