Repository: beam Updated Branches: refs/heads/master 8479094c2 -> dc672f420
Move Triggers from sdk-core to runners-core-construction Converting to/from Runner API is not the SDK API surface. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/facdc108 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/facdc108 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/facdc108 Branch: refs/heads/master Commit: facdc108bbd7bd00a4e7d7aacf447e77785529a0 Parents: 8479094 Author: Dan Halperin <[email protected]> Authored: Wed Apr 12 10:00:45 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Apr 12 16:08:32 2017 -0700 ---------------------------------------------------------------------- .../runners/core/construction/Triggers.java | 336 +++++++++++++++++++ .../core/construction/WindowingStrategies.java | 1 - .../runners/core/construction/TriggersTest.java | 111 ++++++ runners/core-java/pom.xml | 5 + .../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +- .../beam/runners/core/ReduceFnTester.java | 2 +- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 3 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +- .../beam/sdk/transforms/windowing/Triggers.java | 1 + .../sdk/transforms/windowing/TriggersTest.java | 100 ------ 13 files changed, 460 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java new file mode 100644 index 0000000..81f738d --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.TimestampTransform; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ReshuffleTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** Utilities for working with {@link Triggers Triggers}. */ +@Experimental(Experimental.Kind.TRIGGER) +public class Triggers implements Serializable { + + @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); + + public static RunnerApi.Trigger toProto(Trigger trigger) { + return CONVERTER.convertTrigger(trigger); + } + + @VisibleForTesting + static class ProtoConverter { + + public RunnerApi.Trigger convertTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryConvert(evaluationMethod, trigger); + } + + private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) { + try { + return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class<?> clazz) { + try { + return getClass().getDeclaredMethod("convertSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new IllegalArgumentException( + String.format( + "Cannot translate trigger class %s to a runner-API proto.", + clazz.getCanonicalName()), + exc); + } + } + + private RunnerApi.Trigger convertSpecific(DefaultTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) { + return RunnerApi.Trigger.newBuilder() + .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(NeverTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setAlways(RunnerApi.Trigger.Always.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { + return RunnerApi.Trigger.newBuilder() + .setAfterSynchronizedProcessingTime( + RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance()) + .build(); + } + + private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME: + return RunnerApi.TimeDomain.EVENT_TIME; + case PROCESSING_TIME: + return RunnerApi.TimeDomain.PROCESSING_TIME; + case SYNCHRONIZED_PROCESSING_TIME: + return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + default: + throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); + } + } + + private RunnerApi.Trigger convertSpecific(AfterFirst v) { + RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterAll v) { + RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterPane v) { + return RunnerApi.Trigger.newBuilder() + .setElementCount( + RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount())) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) { + RunnerApi.Trigger.AfterEndOfWindow.Builder builder = + RunnerApi.Trigger.AfterEndOfWindow.newBuilder(); + + builder.setEarlyFirings(toProto(v.getEarlyTrigger())); + if (v.getLateTrigger() != null) { + builder.setLateFirings(toProto(v.getLateTrigger())); + } + + return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterEach v) { + RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(Repeatedly v) { + return RunnerApi.Trigger.newBuilder() + .setRepeat( + RunnerApi.Trigger.Repeat.newBuilder() + .setSubtrigger(toProto(v.getRepeatedTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setOrFinally( + RunnerApi.Trigger.OrFinally.newBuilder() + .setMain(toProto(v.getMainTrigger())) + .setFinally(toProto(v.getUntilTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) { + RunnerApi.Trigger.AfterProcessingTime.Builder builder = + RunnerApi.Trigger.AfterProcessingTime.newBuilder(); + + for (TimestampTransform transform : v.getTimestampTransforms()) { + builder.addTimestampTransforms(convertTimestampTransform(transform)); + } + + return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build(); + } + + private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis())) + .build(); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return RunnerApi.TimestampTransform.newBuilder() + .setAlignTo( + RunnerApi.TimestampTransform.AlignTo.newBuilder() + .setPeriod(alignTo.getPeriod().getMillis()) + .setOffset(alignTo.getOffset().getMillis())) + .build(); + + } else { + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + } + + public static Trigger fromProto(RunnerApi.Trigger triggerProto) { + switch (triggerProto.getTriggerCase()) { + case AFTER_ALL: + return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList())); + case AFTER_ANY: + return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList())); + case AFTER_EACH: + return AfterEach.inOrder( + protosToTriggers(triggerProto.getAfterEach().getSubtriggersList())); + case AFTER_END_OF_WINDOW: + RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow(); + + if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) { + return AfterWatermark.pastEndOfWindow(); + } + + // It either has early or late firings or both; our typing in Java makes this a smidge + // annoying + if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) { + AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings())); + + if (triggerProto.getAfterEndOfWindow().hasLateFirings()) { + trigger = + trigger.withLateFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWindow().getLateFirings())); + } + return trigger; + } else { + // only late firings, so return directly + return AfterWatermark.pastEndOfWindow() + .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings())); + } + case AFTER_PROCESSING_TIME: + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane(); + for (RunnerApi.TimestampTransform transform : + triggerProto.getAfterProcessingTime().getTimestampTransformsList()) { + switch (transform.getTimestampTransformCase()) { + case ALIGN_TO: + trigger = + trigger.alignedTo( + Duration.millis(transform.getAlignTo().getPeriod()), + new Instant(transform.getAlignTo().getOffset())); + break; + case DELAY: + trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); + break; + case TIMESTAMPTRANSFORM_NOT_SET: + throw new IllegalArgumentException( + String.format( + "Required field 'timestamp_transform' not set in %s", transform)); + default: + throw new IllegalArgumentException( + String.format( + "Unknown timestamp transform case: %s", + transform.getTimestampTransformCase())); + } + } + return trigger; + case AFTER_SYNCHRONIZED_PROCESSING_TIME: + return AfterSynchronizedProcessingTime.ofFirstElement(); + case ALWAYS: + return new ReshuffleTrigger(); + case ELEMENT_COUNT: + return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount()); + case NEVER: + return Never.ever(); + case OR_FINALLY: + return fromProto(triggerProto.getOrFinally().getMain()) + .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally())); + case REPEAT: + return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger())); + case DEFAULT: + return DefaultTrigger.of(); + case TRIGGER_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'trigger' not set in %s", triggerProto)); + default: + throw new IllegalArgumentException( + String.format("Unknown trigger case: %s", triggerProto.getTriggerCase())); + } + } + + private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) { + List<Trigger> result = Lists.newArrayList(); + for (RunnerApi.Trigger trigger : triggers) { + result.add(fromProto(trigger)); + } + return result; + } + + // Do not instantiate + private Triggers() {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java index 6d721b0..3d7deef 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SerializableUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java new file mode 100644 index 0000000..cf9d40c --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for utilities in {@link Triggers}. */ +@RunWith(Parameterized.class) +public class TriggersTest { + + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract Trigger getTrigger(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { + return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger); + } + + @Parameters(name = "{index}: {0}") + public static Iterable<ToProtoAndBackSpec> data() { + return ImmutableList.of( + // Atomic triggers + toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), + toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), + toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), + toProtoAndBackSpec(Never.ever()), + toProtoAndBackSpec(DefaultTrigger.of()), + toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .alignedTo(Duration.millis(5), new Instant(27))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(3)) + .alignedTo(Duration.millis(5), new Instant(27)) + .plusDelayOf(Duration.millis(13))), + + // Composite triggers + + toProtoAndBackSpec( + AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) + .withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + Trigger trigger = toProtoAndBackSpec.getTrigger(); + Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger)); + + assertThat(toProtoAndBackTrigger, equalTo(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index affd1a9..f066abf 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -77,6 +77,11 @@ <artifactId>beam-sdks-common-runner-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + </dependency> + <!-- build dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index e3ce1ef..5508b2e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.core; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 8dc1502..bf48df1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -17,12 +17,12 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 444f8fe..8fff0e4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core; import java.util.Collection; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; @@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index d18a1c3..512420f 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; @@ -60,7 +61,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index f7fd4cf..b4ca998 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; @@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1f2fcb6..0e74fa2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.SparkPipelineOptions; @@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowingStrategy; @@ -67,7 +67,6 @@ import org.apache.spark.streaming.dstream.PairDStreamFunctions; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Function1; import scala.Option; import scala.Tuple2; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 2b16c60..d19c4a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; +import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.aggregators.NamedAggregators; @@ -38,7 +39,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Triggers; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java index 591af37..47f05e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java @@ -35,6 +35,7 @@ import org.joda.time.Instant; /** Utilities for working with {@link Triggers Triggers}. */ @Experimental(Experimental.Kind.TRIGGER) +@Deprecated public class Triggers implements Serializable { @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java deleted file mode 100644 index 0ac5966..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Tests for utilities in {@link Triggers}. */ -@RunWith(Parameterized.class) -public class TriggersTest { - - @AutoValue - abstract static class ToProtoAndBackSpec { - abstract Trigger getTrigger(); - } - - private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { - return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger); - } - - @Parameters(name = "{index}: {0}") - public static Iterable<ToProtoAndBackSpec> data() { - return ImmutableList.of( - // Atomic triggers - toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), - toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), - toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), - toProtoAndBackSpec(Never.ever()), - toProtoAndBackSpec(DefaultTrigger.of()), - toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane() - .alignedTo(Duration.millis(5), new Instant(27))), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(3)) - .alignedTo(Duration.millis(5), new Instant(27)) - .plusDelayOf(Duration.millis(13))), - - // Composite triggers - - toProtoAndBackSpec( - AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), - toProtoAndBackSpec( - AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) - .withLateFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), - toProtoAndBackSpec( - Repeatedly.forever(AfterPane.elementCountAtLeast(1)) - .orFinally(AfterWatermark.pastEndOfWindow()))); - } - - @Parameter(0) - public ToProtoAndBackSpec toProtoAndBackSpec; - - @Test - public void testToProtoAndBack() throws Exception { - Trigger trigger = toProtoAndBackSpec.getTrigger(); - Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger)); - - assertThat(toProtoAndBackTrigger, equalTo(trigger)); - } -}
