Repository: beam Updated Branches: refs/heads/master 6418bcfcb -> 0637df1bc
Rename Triggers to TriggerTranslation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4fa38e2d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4fa38e2d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4fa38e2d Branch: refs/heads/master Commit: 4fa38e2d590cc4472c119b57790f7a724a700e43 Parents: c8b2119 Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:33:33 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 4 +- .../core/construction/TriggerTranslation.java | 336 +++++++++++++++++++ .../runners/core/construction/Triggers.java | 336 ------------------- .../WindowingStrategyTranslation.java | 4 +- .../construction/TriggerTranslationTest.java | 112 +++++++ .../runners/core/construction/TriggersTest.java | 111 ------ .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 4 +- .../beam/runners/core/ReduceFnTester.java | 10 +- .../GroupAlsoByWindowEvaluatorFactory.java | 4 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 4 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 4 +- 11 files changed, 466 insertions(+), 463 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 1d48e20..39f681f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -41,7 +41,7 @@ import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.coders.Coder; @@ -163,7 +163,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator, windowingStrategy, ExecutableTriggerStateMachine.create( TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger()))), + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), stateInternalsFactory.stateInternalsForKey(key), timerInternals, new OutputWindowedValue<KV<K, Iterable<V>>>() { http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java new file mode 100644 index 0000000..777b165 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TriggerTranslation.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; +import org.apache.beam.sdk.transforms.windowing.TimestampTransform; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** Utilities for working with {@link TriggerTranslation Triggers}. */ +@Experimental(Experimental.Kind.TRIGGER) +public class TriggerTranslation implements Serializable { + + @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); + + public static RunnerApi.Trigger toProto(Trigger trigger) { + return CONVERTER.convertTrigger(trigger); + } + + @VisibleForTesting + static class ProtoConverter { + + public RunnerApi.Trigger convertTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryConvert(evaluationMethod, trigger); + } + + private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) { + try { + return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class<?> clazz) { + try { + return getClass().getDeclaredMethod("convertSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new IllegalArgumentException( + String.format( + "Cannot translate trigger class %s to a runner-API proto.", + clazz.getCanonicalName()), + exc); + } + } + + private RunnerApi.Trigger convertSpecific(DefaultTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) { + return RunnerApi.Trigger.newBuilder() + .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(NeverTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setAlways(RunnerApi.Trigger.Always.getDefaultInstance()) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { + return RunnerApi.Trigger.newBuilder() + .setAfterSynchronizedProcessingTime( + RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance()) + .build(); + } + + private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { + switch (timeDomain) { + case EVENT_TIME: + return RunnerApi.TimeDomain.EVENT_TIME; + case PROCESSING_TIME: + return RunnerApi.TimeDomain.PROCESSING_TIME; + case SYNCHRONIZED_PROCESSING_TIME: + return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + default: + throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); + } + } + + private RunnerApi.Trigger convertSpecific(AfterFirst v) { + RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterAll v) { + RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterPane v) { + return RunnerApi.Trigger.newBuilder() + .setElementCount( + RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount())) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) { + RunnerApi.Trigger.AfterEndOfWindow.Builder builder = + RunnerApi.Trigger.AfterEndOfWindow.newBuilder(); + + builder.setEarlyFirings(toProto(v.getEarlyTrigger())); + if (v.getLateTrigger() != null) { + builder.setLateFirings(toProto(v.getLateTrigger())); + } + + return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(AfterEach v) { + RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder(); + + for (Trigger subtrigger : v.subTriggers()) { + builder.addSubtriggers(toProto(subtrigger)); + } + + return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build(); + } + + private RunnerApi.Trigger convertSpecific(Repeatedly v) { + return RunnerApi.Trigger.newBuilder() + .setRepeat( + RunnerApi.Trigger.Repeat.newBuilder() + .setSubtrigger(toProto(v.getRepeatedTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) { + return RunnerApi.Trigger.newBuilder() + .setOrFinally( + RunnerApi.Trigger.OrFinally.newBuilder() + .setMain(toProto(v.getMainTrigger())) + .setFinally(toProto(v.getUntilTrigger()))) + .build(); + } + + private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) { + RunnerApi.Trigger.AfterProcessingTime.Builder builder = + RunnerApi.Trigger.AfterProcessingTime.newBuilder(); + + for (TimestampTransform transform : v.getTimestampTransforms()) { + builder.addTimestampTransforms(convertTimestampTransform(transform)); + } + + return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build(); + } + + private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) { + if (transform instanceof TimestampTransform.Delay) { + return RunnerApi.TimestampTransform.newBuilder() + .setDelay( + RunnerApi.TimestampTransform.Delay.newBuilder() + .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis())) + .build(); + } else if (transform instanceof TimestampTransform.AlignTo) { + TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; + return RunnerApi.TimestampTransform.newBuilder() + .setAlignTo( + RunnerApi.TimestampTransform.AlignTo.newBuilder() + .setPeriod(alignTo.getPeriod().getMillis()) + .setOffset(alignTo.getOffset().getMillis())) + .build(); + + } else { + throw new IllegalArgumentException( + String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); + } + } + } + + public static Trigger fromProto(RunnerApi.Trigger triggerProto) { + switch (triggerProto.getTriggerCase()) { + case AFTER_ALL: + return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList())); + case AFTER_ANY: + return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList())); + case AFTER_EACH: + return AfterEach.inOrder( + protosToTriggers(triggerProto.getAfterEach().getSubtriggersList())); + case AFTER_END_OF_WINDOW: + RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow(); + + if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) { + return AfterWatermark.pastEndOfWindow(); + } + + // It either has early or late firings or both; our typing in Java makes this a smidge + // annoying + if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) { + AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings())); + + if (triggerProto.getAfterEndOfWindow().hasLateFirings()) { + trigger = + trigger.withLateFirings( + (OnceTrigger) + fromProto(triggerProto.getAfterEndOfWindow().getLateFirings())); + } + return trigger; + } else { + // only late firings, so return directly + return AfterWatermark.pastEndOfWindow() + .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings())); + } + case AFTER_PROCESSING_TIME: + AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane(); + for (RunnerApi.TimestampTransform transform : + triggerProto.getAfterProcessingTime().getTimestampTransformsList()) { + switch (transform.getTimestampTransformCase()) { + case ALIGN_TO: + trigger = + trigger.alignedTo( + Duration.millis(transform.getAlignTo().getPeriod()), + new Instant(transform.getAlignTo().getOffset())); + break; + case DELAY: + trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); + break; + case TIMESTAMPTRANSFORM_NOT_SET: + throw new IllegalArgumentException( + String.format( + "Required field 'timestamp_transform' not set in %s", transform)); + default: + throw new IllegalArgumentException( + String.format( + "Unknown timestamp transform case: %s", + transform.getTimestampTransformCase())); + } + } + return trigger; + case AFTER_SYNCHRONIZED_PROCESSING_TIME: + return AfterSynchronizedProcessingTime.ofFirstElement(); + case ALWAYS: + return new ReshuffleTrigger(); + case ELEMENT_COUNT: + return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount()); + case NEVER: + return Never.ever(); + case OR_FINALLY: + return fromProto(triggerProto.getOrFinally().getMain()) + .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally())); + case REPEAT: + return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger())); + case DEFAULT: + return DefaultTrigger.of(); + case TRIGGER_NOT_SET: + throw new IllegalArgumentException( + String.format("Required field 'trigger' not set in %s", triggerProto)); + default: + throw new IllegalArgumentException( + String.format("Unknown trigger case: %s", triggerProto.getTriggerCase())); + } + } + + private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) { + List<Trigger> result = Lists.newArrayList(); + for (RunnerApi.Trigger trigger : triggers) { + result.add(fromProto(trigger)); + } + return result; + } + + // Do not instantiate + private TriggerTranslation() {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java deleted file mode 100644 index df6c9ed..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Never; -import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; -import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; -import org.apache.beam.sdk.transforms.windowing.TimestampTransform; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** Utilities for working with {@link Triggers Triggers}. */ -@Experimental(Experimental.Kind.TRIGGER) -public class Triggers implements Serializable { - - @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter(); - - public static RunnerApi.Trigger toProto(Trigger trigger) { - return CONVERTER.convertTrigger(trigger); - } - - @VisibleForTesting - static class ProtoConverter { - - public RunnerApi.Trigger convertTrigger(Trigger trigger) { - Method evaluationMethod = getEvaluationMethod(trigger.getClass()); - return tryConvert(evaluationMethod, trigger); - } - - private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) { - try { - return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger); - } catch (InvocationTargetException exc) { - if (exc.getCause() instanceof RuntimeException) { - throw (RuntimeException) exc.getCause(); - } else { - throw new RuntimeException(exc.getCause()); - } - } catch (IllegalAccessException exc) { - throw new IllegalStateException( - String.format("Internal error: could not invoke %s", evaluationMethod)); - } - } - - private Method getEvaluationMethod(Class<?> clazz) { - try { - return getClass().getDeclaredMethod("convertSpecific", clazz); - } catch (NoSuchMethodException exc) { - throw new IllegalArgumentException( - String.format( - "Cannot translate trigger class %s to a runner-API proto.", - clazz.getCanonicalName()), - exc); - } - } - - private RunnerApi.Trigger convertSpecific(DefaultTrigger v) { - return RunnerApi.Trigger.newBuilder() - .setDefault(RunnerApi.Trigger.Default.getDefaultInstance()) - .build(); - } - - private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) { - return RunnerApi.Trigger.newBuilder() - .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder()) - .build(); - } - - private RunnerApi.Trigger convertSpecific(NeverTrigger v) { - return RunnerApi.Trigger.newBuilder() - .setNever(RunnerApi.Trigger.Never.getDefaultInstance()) - .build(); - } - - private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) { - return RunnerApi.Trigger.newBuilder() - .setAlways(RunnerApi.Trigger.Always.getDefaultInstance()) - .build(); - } - - private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) { - return RunnerApi.Trigger.newBuilder() - .setAfterSynchronizedProcessingTime( - RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance()) - .build(); - } - - private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) { - switch (timeDomain) { - case EVENT_TIME: - return RunnerApi.TimeDomain.EVENT_TIME; - case PROCESSING_TIME: - return RunnerApi.TimeDomain.PROCESSING_TIME; - case SYNCHRONIZED_PROCESSING_TIME: - return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME; - default: - throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain)); - } - } - - private RunnerApi.Trigger convertSpecific(AfterFirst v) { - RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder(); - - for (Trigger subtrigger : v.subTriggers()) { - builder.addSubtriggers(toProto(subtrigger)); - } - - return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build(); - } - - private RunnerApi.Trigger convertSpecific(AfterAll v) { - RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder(); - - for (Trigger subtrigger : v.subTriggers()) { - builder.addSubtriggers(toProto(subtrigger)); - } - - return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build(); - } - - private RunnerApi.Trigger convertSpecific(AfterPane v) { - return RunnerApi.Trigger.newBuilder() - .setElementCount( - RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount())) - .build(); - } - - private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) { - RunnerApi.Trigger.AfterEndOfWindow.Builder builder = - RunnerApi.Trigger.AfterEndOfWindow.newBuilder(); - - builder.setEarlyFirings(toProto(v.getEarlyTrigger())); - if (v.getLateTrigger() != null) { - builder.setLateFirings(toProto(v.getLateTrigger())); - } - - return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build(); - } - - private RunnerApi.Trigger convertSpecific(AfterEach v) { - RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder(); - - for (Trigger subtrigger : v.subTriggers()) { - builder.addSubtriggers(toProto(subtrigger)); - } - - return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build(); - } - - private RunnerApi.Trigger convertSpecific(Repeatedly v) { - return RunnerApi.Trigger.newBuilder() - .setRepeat( - RunnerApi.Trigger.Repeat.newBuilder() - .setSubtrigger(toProto(v.getRepeatedTrigger()))) - .build(); - } - - private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) { - return RunnerApi.Trigger.newBuilder() - .setOrFinally( - RunnerApi.Trigger.OrFinally.newBuilder() - .setMain(toProto(v.getMainTrigger())) - .setFinally(toProto(v.getUntilTrigger()))) - .build(); - } - - private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) { - RunnerApi.Trigger.AfterProcessingTime.Builder builder = - RunnerApi.Trigger.AfterProcessingTime.newBuilder(); - - for (TimestampTransform transform : v.getTimestampTransforms()) { - builder.addTimestampTransforms(convertTimestampTransform(transform)); - } - - return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build(); - } - - private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) { - if (transform instanceof TimestampTransform.Delay) { - return RunnerApi.TimestampTransform.newBuilder() - .setDelay( - RunnerApi.TimestampTransform.Delay.newBuilder() - .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis())) - .build(); - } else if (transform instanceof TimestampTransform.AlignTo) { - TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform; - return RunnerApi.TimestampTransform.newBuilder() - .setAlignTo( - RunnerApi.TimestampTransform.AlignTo.newBuilder() - .setPeriod(alignTo.getPeriod().getMillis()) - .setOffset(alignTo.getOffset().getMillis())) - .build(); - - } else { - throw new IllegalArgumentException( - String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform)); - } - } - } - - public static Trigger fromProto(RunnerApi.Trigger triggerProto) { - switch (triggerProto.getTriggerCase()) { - case AFTER_ALL: - return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList())); - case AFTER_ANY: - return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList())); - case AFTER_EACH: - return AfterEach.inOrder( - protosToTriggers(triggerProto.getAfterEach().getSubtriggersList())); - case AFTER_END_OF_WINDOW: - RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow(); - - if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) { - return AfterWatermark.pastEndOfWindow(); - } - - // It either has early or late firings or both; our typing in Java makes this a smidge - // annoying - if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) { - AfterWatermarkEarlyAndLate trigger = - AfterWatermark.pastEndOfWindow() - .withEarlyFirings( - (OnceTrigger) - fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings())); - - if (triggerProto.getAfterEndOfWindow().hasLateFirings()) { - trigger = - trigger.withLateFirings( - (OnceTrigger) - fromProto(triggerProto.getAfterEndOfWindow().getLateFirings())); - } - return trigger; - } else { - // only late firings, so return directly - return AfterWatermark.pastEndOfWindow() - .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings())); - } - case AFTER_PROCESSING_TIME: - AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane(); - for (RunnerApi.TimestampTransform transform : - triggerProto.getAfterProcessingTime().getTimestampTransformsList()) { - switch (transform.getTimestampTransformCase()) { - case ALIGN_TO: - trigger = - trigger.alignedTo( - Duration.millis(transform.getAlignTo().getPeriod()), - new Instant(transform.getAlignTo().getOffset())); - break; - case DELAY: - trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis())); - break; - case TIMESTAMPTRANSFORM_NOT_SET: - throw new IllegalArgumentException( - String.format( - "Required field 'timestamp_transform' not set in %s", transform)); - default: - throw new IllegalArgumentException( - String.format( - "Unknown timestamp transform case: %s", - transform.getTimestampTransformCase())); - } - } - return trigger; - case AFTER_SYNCHRONIZED_PROCESSING_TIME: - return AfterSynchronizedProcessingTime.ofFirstElement(); - case ALWAYS: - return new ReshuffleTrigger(); - case ELEMENT_COUNT: - return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount()); - case NEVER: - return Never.ever(); - case OR_FINALLY: - return fromProto(triggerProto.getOrFinally().getMain()) - .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally())); - case REPEAT: - return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger())); - case DEFAULT: - return DefaultTrigger.of(); - case TRIGGER_NOT_SET: - throw new IllegalArgumentException( - String.format("Required field 'trigger' not set in %s", triggerProto)); - default: - throw new IllegalArgumentException( - String.format("Unknown trigger case: %s", triggerProto.getTriggerCase())); - } - } - - private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) { - List<Trigger> result = Lists.newArrayList(); - for (RunnerApi.Trigger trigger : triggers) { - result.add(fromProto(trigger)); - } - return result; - } - - // Do not instantiate - private Triggers() {} -} http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java index 061f309..e92565f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java @@ -210,7 +210,7 @@ public class WindowingStrategyTranslation implements Serializable { .setAccumulationMode(toProto(windowingStrategy.getMode())) .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior())) .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis()) - .setTrigger(Triggers.toProto(windowingStrategy.getTrigger())) + .setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger())) .setWindowFn(windowFnSpec) .setWindowCoderId( components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); @@ -247,7 +247,7 @@ public class WindowingStrategyTranslation implements Serializable { WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec); TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime()); AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode()); - Trigger trigger = Triggers.fromProto(proto.getTrigger()); + Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger()); ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior()); Duration allowedLateness = Duration.millis(proto.getAllowedLateness()); http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java new file mode 100644 index 0000000..55ea87b --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggerTranslationTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for utilities in {@link TriggerTranslation}. */ +@RunWith(Parameterized.class) +public class TriggerTranslationTest { + + @AutoValue + abstract static class ToProtoAndBackSpec { + abstract Trigger getTrigger(); + } + + private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { + return new AutoValue_TriggerTranslationTest_ToProtoAndBackSpec(trigger); + } + + @Parameters(name = "{index}: {0}") + public static Iterable<ToProtoAndBackSpec> data() { + return ImmutableList.of( + // Atomic triggers + toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), + toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), + toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), + toProtoAndBackSpec(Never.ever()), + toProtoAndBackSpec(DefaultTrigger.of()), + toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .alignedTo(Duration.millis(5), new Instant(27))), + toProtoAndBackSpec( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardSeconds(3)) + .alignedTo(Duration.millis(5), new Instant(27)) + .plusDelayOf(Duration.millis(13))), + + // Composite triggers + + toProtoAndBackSpec( + AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) + .withLateFirings(AfterPane.elementCountAtLeast(3))), + toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), + toProtoAndBackSpec( + Repeatedly.forever(AfterPane.elementCountAtLeast(1)) + .orFinally(AfterWatermark.pastEndOfWindow()))); + } + + @Parameter(0) + public ToProtoAndBackSpec toProtoAndBackSpec; + + @Test + public void testToProtoAndBack() throws Exception { + Trigger trigger = toProtoAndBackSpec.getTrigger(); + Trigger toProtoAndBackTrigger = + TriggerTranslation.fromProto(TriggerTranslation.toProto(trigger)); + + assertThat(toProtoAndBackTrigger, equalTo(trigger)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java deleted file mode 100644 index cf9d40c..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.transforms.windowing.AfterAll; -import org.apache.beam.sdk.transforms.windowing.AfterEach; -import org.apache.beam.sdk.transforms.windowing.AfterFirst; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Never; -import org.apache.beam.sdk.transforms.windowing.Repeatedly; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Tests for utilities in {@link Triggers}. */ -@RunWith(Parameterized.class) -public class TriggersTest { - - @AutoValue - abstract static class ToProtoAndBackSpec { - abstract Trigger getTrigger(); - } - - private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) { - return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger); - } - - @Parameters(name = "{index}: {0}") - public static Iterable<ToProtoAndBackSpec> data() { - return ImmutableList.of( - // Atomic triggers - toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()), - toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)), - toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()), - toProtoAndBackSpec(Never.ever()), - toProtoAndBackSpec(DefaultTrigger.of()), - toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane() - .alignedTo(Duration.millis(5), new Instant(27))), - toProtoAndBackSpec( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardSeconds(3)) - .alignedTo(Duration.millis(5), new Instant(27)) - .plusDelayOf(Duration.millis(13))), - - // Composite triggers - - toProtoAndBackSpec( - AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())), - toProtoAndBackSpec( - AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42))) - .withLateFirings(AfterPane.elementCountAtLeast(3))), - toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())), - toProtoAndBackSpec( - Repeatedly.forever(AfterPane.elementCountAtLeast(1)) - .orFinally(AfterWatermark.pastEndOfWindow()))); - } - - @Parameter(0) - public ToProtoAndBackSpec toProtoAndBackSpec; - - @Test - public void testToProtoAndBack() throws Exception { - Trigger trigger = toProtoAndBackSpec.getTrigger(); - Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger)); - - assertThat(toProtoAndBackTrigger, equalTo(trigger)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 744d162..0a520bd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.core; import java.util.Collection; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.DoFn; @@ -122,7 +122,7 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< windowingStrategy, ExecutableTriggerStateMachine.create( TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger()))), + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, outputWindowedValue(), http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 7de8f3b..7f83eae 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -38,7 +38,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; @@ -116,7 +116,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return new ReduceFnTester<Integer, Iterable<Integer>, W>( windowingStrategy, TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger())), + TriggerTranslation.toProto(windowingStrategy.getTrigger())), SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()), IterableCoder.of(VarIntCoder.of()), PipelineOptionsFactory.create(), @@ -179,7 +179,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return combining( strategy, - TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(strategy.getTrigger())), combineFn, outputCoder); } @@ -227,7 +228,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return combining( strategy, - TriggerStateMachines.stateMachineForTrigger(Triggers.toProto(strategy.getTrigger())), + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(strategy.getTrigger())), combineFn, outputCoder, options, http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 1a588ee..a944e75 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -32,7 +32,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; @@ -162,7 +162,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { (CopyOnAccessInMemoryStateInternals) stepContext.stateInternals(); DirectTimerInternals timerInternals = stepContext.timerInternals(); RunnerApi.Trigger runnerApiTrigger = - Triggers.toProto(windowingStrategy.getTrigger()); + TriggerTranslation.toProto(windowingStrategy.getTrigger()); ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = new ReduceFnRunner<>( key, http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 815b6ba..be4f3f6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.metrics.CounterCell; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; @@ -260,7 +260,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { windowingStrategy, ExecutableTriggerStateMachine.create( TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger()))), + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, outputHolder, http://git-wip-us.apache.org/repos/asf/beam/blob/4fa38e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index be02335..d2a3424 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.UnsupportedSideInputReader; -import org.apache.beam.runners.core.construction.Triggers; +import org.apache.beam.runners.core.construction.TriggerTranslation; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.aggregators.NamedAggregators; @@ -92,7 +92,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde windowingStrategy, ExecutableTriggerStateMachine.create( TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger()))), + TriggerTranslation.toProto(windowingStrategy.getTrigger()))), stateInternals, timerInternals, outputter,
