http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index a87a16d..a8490bf 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; +import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -143,30 +143,22 @@ public class ParDoTranslationTest { inputs.putAll(parDo.getAdditionalInputs()); PCollectionTuple output = mainInput.apply(parDo); - SdkComponents sdkComponents = SdkComponents.create(); - - // Encode - RunnerApi.PTransform protoTransform = - PTransformTranslation.toProto( + SdkComponents components = SdkComponents.create(); + String transformId = + components.registerPTransform( AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of( "foo", inputs, output.expand(), parDo, p), - sdkComponents); - Components protoComponents = sdkComponents.toComponents(); - - // Decode - Pipeline rehydratedPipeline = Pipeline.create(); + Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + Components protoComponents = components.toComponents(); + RunnerApi.PTransform protoTransform = protoComponents.getTransformsOrThrow(transformId); ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); for (PCollectionView<?> view : parDo.getSideInputs()) { SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); PCollectionView<?> restoredView = - ParDoTranslation.viewFromProto( - sideInput, - view.getTagInternal().getId(), - view.getPCollection(), - protoTransform, - protoComponents); + ParDoTranslation.fromProto( + sideInput, view.getTagInternal().getId(), protoTransform, protoComponents); assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); assertThat( @@ -177,7 +169,7 @@ public class ParDoTranslationTest { view.getWindowingStrategyInternal().fixDefaults())); assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); } - String mainInputId = sdkComponents.registerPCollection(mainInput); + String mainInputId = components.registerPCollection(mainInput); assertThat( ParDoTranslation.getMainInput(protoTransform, protoComponents), equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 267232c..6e4d6c4 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.core.construction; -import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop; import static org.junit.Assert.assertEquals; import java.io.Serializable; @@ -25,6 +24,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -69,6 +70,7 @@ public class SplittableParDoTest { public void checkDone() {} } + @BoundedPerElement private static class BoundedFakeFn extends DoFn<Integer, String> { @ProcessElement public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @@ -79,12 +81,10 @@ public class SplittableParDoTest { } } + @UnboundedPerElement private static class UnboundedFakeFn extends DoFn<Integer, String> { @ProcessElement - public ProcessContinuation processElement( - ProcessContext context, SomeRestrictionTracker tracker) { - return stop(); - } + public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {} @GetInitialRestriction public SomeRestriction getInitialRestriction(Integer element) { @@ -122,14 +122,14 @@ public class SplittableParDoTest { "Applying a bounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) - .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn))) + .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); assertEquals( "Applying a bounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn))) + .apply("bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); } @@ -143,14 +143,14 @@ public class SplittableParDoTest { "Applying an unbounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.UNBOUNDED, makeBoundedCollection(pipeline) - .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn))) + .apply("unbounded to bounded", new SplittableParDo<>(makeParDo(unboundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); assertEquals( "Applying an unbounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn))) + .apply("unbounded to unbounded", new SplittableParDo<>(makeParDo(unboundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java deleted file mode 100644 index f5b2c11..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.junit.Assert.assertThat; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link TransformInputs}. */ -@RunWith(JUnit4.class) -public class TransformInputsTest { - @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void nonAdditionalInputsWithNoInputSucceeds() { - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "input-free", - Collections.<TupleTag<?>, PValue>emptyMap(), - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat(TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>empty()); - } - - @Test - public void nonAdditionalInputsWithOneMainInputSucceeds() { - PCollection<Long> input = pipeline.apply(GenerateSequence.from(1L)); - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "input-single", - Collections.<TupleTag<?>, PValue>singletonMap(new TupleTag<Long>() {}, input), - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>containsInAnyOrder(input)); - } - - @Test - public void nonAdditionalInputsWithMultipleNonAdditionalInputsSucceeds() { - Map<TupleTag<?>, PValue> allInputs = new HashMap<>(); - PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3)); - allInputs.put(new TupleTag<Integer>() {}, mainInts); - PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of())); - allInputs.put(new TupleTag<Void>() {}, voids); - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional-free", - allInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), - Matchers.<PValue>containsInAnyOrder(voids, mainInts)); - } - - @Test - public void nonAdditionalInputsWithAdditionalInputsSucceeds() { - Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>(); - additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3"))); - additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L))); - - Map<TupleTag<?>, PValue> allInputs = new HashMap<>(); - PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3)); - allInputs.put(new TupleTag<Integer>() {}, mainInts); - PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of())); - allInputs.put( - new TupleTag<Void>() {}, voids); - allInputs.putAll(additionalInputs); - - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional", - allInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(additionalInputs), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), - Matchers.<PValue>containsInAnyOrder(mainInts, voids)); - } - - @Test - public void nonAdditionalInputsWithOnlyAdditionalInputsThrows() { - Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>(); - additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3"))); - additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L))); - - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional-only", - additionalInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(additionalInputs), - pipeline); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("at least one"); - TransformInputs.nonAdditionalInputs(transform); - } - - private static class TestTransform extends PTransform<PInput, POutput> { - private final Map<TupleTag<?>, PValue> additionalInputs; - - private TestTransform() { - this(Collections.<TupleTag<?>, PValue>emptyMap()); - } - - private TestTransform(Map<TupleTag<?>, PValue> additionalInputs) { - this.additionalInputs = additionalInputs; - } - - @Override - public POutput expand(PInput input) { - return PDone.in(input.getPipeline()); - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return additionalInputs; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java index 7a57fd7..e406545 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java @@ -116,8 +116,5 @@ public class WindowingStrategyTranslationTest { protoComponents.getCodersOrThrow( components.registerCoder(windowingStrategy.getWindowFn().windowCoder())); - assertThat( - proto.getAssignsToOneWindow(), - equalTo(windowingStrategy.getWindowFn().assignsToOneWindow())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java index 283df16..739034c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java @@ -26,10 +26,8 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; -import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; -import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResourceId; @@ -38,8 +36,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.junit.Test; @@ -60,17 +56,16 @@ public class WriteFilesTranslationTest { @RunWith(Parameterized.class) public static class TestWriteFilesPayloadTranslation { @Parameters(name = "{index}: {0}") - public static Iterable<WriteFiles<Object, Void, Object>> data() { - SerializableFunction<Object, Object> format = SerializableFunctions.constant(null); - return ImmutableList.of( - WriteFiles.to(new DummySink(), format), - WriteFiles.to(new DummySink(), format).withWindowedWrites(), - WriteFiles.to(new DummySink(), format).withNumShards(17), - WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42)); + public static Iterable<WriteFiles<?>> data() { + return ImmutableList.<WriteFiles<?>>of( + WriteFiles.to(new DummySink()), + WriteFiles.to(new DummySink()).withWindowedWrites(), + WriteFiles.to(new DummySink()).withNumShards(17), + WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42)); } @Parameter(0) - public WriteFiles<String, Void, String> writeFiles; + public WriteFiles<String> writeFiles; public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @@ -85,7 +80,7 @@ public class WriteFilesTranslationTest { assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites())); assertThat( - (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()), + (FileBasedSink<String>) WriteFilesTranslation.sinkFromProto(payload.getSink()), equalTo(writeFiles.getSink())); } @@ -94,9 +89,9 @@ public class WriteFilesTranslationTest { PCollection<String> input = p.apply(Create.of("hello")); PDone output = input.apply(writeFiles); - AppliedPTransform<PCollection<String>, PDone, WriteFiles<String, Void, String>> - appliedPTransform = - AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p); + AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> appliedPTransform = + AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of( + "foo", input.expand(), output.expand(), writeFiles, p); assertThat( WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform), @@ -106,9 +101,7 @@ public class WriteFilesTranslationTest { WriteFilesTranslation.isWindowedWrites(appliedPTransform), equalTo(writeFiles.isWindowedWrites())); - assertThat( - WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform), - equalTo(writeFiles.getSink())); + assertThat(WriteFilesTranslation.getSink(appliedPTransform), equalTo(writeFiles.getSink())); } } @@ -116,16 +109,16 @@ public class WriteFilesTranslationTest { * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid * any issues serializing mocks. */ - private static class DummySink extends FileBasedSink<Object, Void> { + private static class DummySink extends FileBasedSink<String> { DummySink() { super( StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)), - DynamicFileDestinations.constant(new DummyFilenamePolicy())); + new DummyFilenamePolicy()); } @Override - public WriteOperation<Object, Void> createWriteOperation() { + public WriteOperation<String> createWriteOperation() { return new DummyWriteOperation(this); } @@ -137,39 +130,46 @@ public class WriteFilesTranslationTest { DummySink that = (DummySink) other; - return getTempDirectoryProvider().isAccessible() - && that.getTempDirectoryProvider().isAccessible() - && getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get()); + return getFilenamePolicy().equals(((DummySink) other).getFilenamePolicy()) + && getBaseOutputDirectoryProvider().isAccessible() + && that.getBaseOutputDirectoryProvider().isAccessible() + && getBaseOutputDirectoryProvider() + .get() + .equals(that.getBaseOutputDirectoryProvider().get()); } @Override public int hashCode() { return Objects.hash( DummySink.class, - getTempDirectoryProvider().isAccessible() ? getTempDirectoryProvider().get() : null); + getFilenamePolicy(), + getBaseOutputDirectoryProvider().isAccessible() + ? getBaseOutputDirectoryProvider().get() + : null); } } - private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> { - public DummyWriteOperation(FileBasedSink<Object, Void> sink) { + private static class DummyWriteOperation extends FileBasedSink.WriteOperation<String> { + public DummyWriteOperation(FileBasedSink<String> sink) { super(sink); } @Override - public FileBasedSink.Writer<Object, Void> createWriter() throws Exception { + public FileBasedSink.Writer<String> createWriter() throws Exception { throw new UnsupportedOperationException("Should never be called."); } } private static class DummyFilenamePolicy extends FilenamePolicy { @Override - public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) { + public ResourceId windowedFilename( + ResourceId outputDirectory, WindowedContext c, String extension) { throw new UnsupportedOperationException("Should never be called."); } @Nullable @Override - public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) { + public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) { throw new UnsupportedOperationException("Should never be called."); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml index 8c8e599..c3a8d25 100644 --- a/runners/core-java/pom.xml +++ b/runners/core-java/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 28938c1..1cf1509 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -134,27 +134,26 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin // The element is too late for this window. droppedDueToLateness.inc(); WindowTracing.debug( - "{}: Dropping element at {} for key:{}; window:{} " - + "since too far behind inputWatermark:{}; outputWatermark:{}", - LateDataFilter.class.getSimpleName(), - input.getTimestamp(), - key, - window, - timerInternals.currentInputWatermarkTime(), + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since too far behind inputWatermark:{}; outputWatermark:{}", + input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); } } - Iterable<WindowedValue<InputT>> nonLateElements = - Iterables.filter( - concatElements, - new Predicate<WindowedValue<InputT>>() { - @Override - public boolean apply(WindowedValue<InputT> input) { - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); - return !canDropDueToExpiredWindow(window); - } - }); + Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter( + concatElements, + new Predicate<WindowedValue<InputT>>() { + @Override + public boolean apply(WindowedValue<InputT> input) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + if (canDropDueToExpiredWindow(window)) { + return false; + } else { + return true; + } + } + }); return nonLateElements; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 0c956d5..2db6531 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -96,7 +96,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< final WindowedValue<InputT> element, final TrackerT tracker) { final ProcessContext processContext = new ProcessContext(element, tracker); - DoFn.ProcessContinuation cont = invoker.invokeProcessElement( + invoker.invokeProcessElement( new DoFnInvoker.ArgumentProvider<InputT, OutputT>() { @Override public DoFn<InputT, OutputT>.ProcessContext processContext( @@ -118,11 +118,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< } @Override - public PipelineOptions pipelineOptions() { - return pipelineOptions; - } - - @Override public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new IllegalStateException( "Should not access startBundleContext() from @" @@ -155,37 +150,10 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< "Access to timers not supported in Splittable DoFn"); } }); - // TODO: verify that if there was a failed tryClaim() call, then cont.shouldResume() is false. - // Currently we can't verify this because there are no hooks into tryClaim(). - // See https://issues.apache.org/jira/browse/BEAM-2607 - RestrictionT residual = processContext.extractCheckpoint(); - if (cont.shouldResume()) { - if (residual == null) { - // No checkpoint had been taken by the runner while the ProcessElement call ran, however - // the call says that not the whole restriction has been processed. So we need to take - // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly - // the work that was done in the current ProcessElement call, and returns a residual - // restriction that describes exactly the work that wasn't done in the current call. - residual = tracker.checkpoint(); - } else { - // A checkpoint was taken by the runner, and then the ProcessElement call returned resume() - // without making more tryClaim() calls (since no tryClaim() calls can succeed after - // checkpoint(), and since if it had made a failed tryClaim() call, it should have returned - // stop()). - // This means that the resulting primary restriction and the taken checkpoint already - // accurately describe respectively the work that was and wasn't done in the current - // ProcessElement call. - // In other words, if we took a checkpoint *after* ProcessElement completed (like in the - // branch above), it would have been equivalent to this one. - } - } else { - // The ProcessElement call returned stop() - that means the tracker's current restriction - // has been fully processed by the call. A checkpoint may or may not have been taken in - // "residual"; if it was, then we'll need to process it; if no, then we don't - nothing - // special needs to be done. - } + tracker.checkDone(); - return new Result(residual, cont, processContext.getLastReportedWatermark()); + return new Result( + processContext.extractCheckpoint(), processContext.getLastReportedWatermark()); } private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java index 88275d6..31e86bd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java @@ -24,11 +24,11 @@ import java.util.Collection; import java.util.Collections; import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; @@ -38,13 +38,16 @@ import org.joda.time.Instant; */ public class ProcessFnRunner<InputT, OutputT, RestrictionT> implements PushbackSideInputDoFnRunner< - KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> { - private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying; + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { + private final DoFnRunner< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + underlying; private final Collection<PCollectionView<?>> views; private final ReadyCheckingSideInputReader sideInputReader; ProcessFnRunner( - DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> underlying, + DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + underlying, Collection<PCollectionView<?>> views, ReadyCheckingSideInputReader sideInputReader) { this.underlying = underlying; @@ -58,9 +61,10 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT> } @Override - public Iterable<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>> + public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> processElementInReadyWindows( - WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>> windowedKWI) { + WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + windowedKWI) { checkTrivialOuterWindows(windowedKWI); BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue()); if (!isReady(window)) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 634a2d1..62d519f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -29,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; @@ -637,9 +637,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } /** - * A descriptor of the activation for a window based on a timer. + * Enriches TimerData with state necessary for processing a timer as well as + * common queries about a timer. */ - private class WindowActivation { + private class EnrichedTimerData { + public final Instant timestamp; public final ReduceFn<K, InputT, OutputT, W>.Context directContext; public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext; // If this is an end-of-window timer then we may need to set a garbage collection timer @@ -650,34 +652,18 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // end-of-window time to be a signal to garbage collect. public final boolean isGarbageCollection; - WindowActivation( + EnrichedTimerData( + TimerData timer, ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext) { + this.timestamp = timer.getTimestamp(); this.directContext = directContext; this.renamedContext = renamedContext; W window = directContext.window(); - - // The output watermark is before the end of the window if it is either unknown - // or it is known to be before it. If it is unknown, that means that there hasn't been - // enough data to advance it. - boolean outputWatermarkBeforeEOW = - timerInternals.currentOutputWatermarkTime() == null - || !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp()); - - // The "end of the window" is reached when the local input watermark (for this key) surpasses - // it but the local output watermark (also for this key) has not. After data is emitted and - // the output watermark hold is released, the output watermark on this key will immediately - // exceed the end of the window (otherwise we could see multiple ON_TIME outputs) - this.isEndOfWindow = - timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp()) - && outputWatermarkBeforeEOW; - - // The "GC time" is reached when the input watermark surpasses the end of the window - // plus allowed lateness. After this, the window is expired and expunged. - this.isGarbageCollection = - timerInternals - .currentInputWatermarkTime() - .isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy)); + this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() + && timer.getTimestamp().equals(window.maxTimestamp()); + Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy); + this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime); } // Has this window had its trigger finish? @@ -696,47 +682,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { return; } - // Create a reusable context for each window and begin prefetching necessary + // Create a reusable context for each timer and begin prefetching necessary // state. - Map<BoundedWindow, WindowActivation> windowActivations = new HashMap(); - + List<EnrichedTimerData> enrichedTimers = new LinkedList(); for (TimerData timer : timers) { checkArgument(timer.getNamespace() instanceof WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace()); @SuppressWarnings("unchecked") WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace(); W window = windowNamespace.getWindow(); - - WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with " - + "inputWatermark:{}; outputWatermark:{}", - ReduceFnRunner.class.getSimpleName(), - key, window, timer, - timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - - // Processing time timers for an expired window are ignored, just like elements - // that show up too late. Window GC is management by an event time timer - if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) { - continue; - } - - // How a window is processed is a function only of the current state, not the details - // of the timer. This makes us robust to large leaps in processing time and watermark - // time, where both EOW and GC timers come in together and we need to GC and emit - // the final pane. - if (windowActivations.containsKey(window)) { - continue; - } - ReduceFn<K, InputT, OutputT, W>.Context directContext = contextFactory.base(window, StateStyle.DIRECT); ReduceFn<K, InputT, OutputT, W>.Context renamedContext = contextFactory.base(window, StateStyle.RENAMED); - WindowActivation windowActivation = new WindowActivation(directContext, renamedContext); - windowActivations.put(window, windowActivation); + EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext); + enrichedTimers.add(enrichedTimer); // Perform prefetching of state to determine if the trigger should fire. - if (windowActivation.isGarbageCollection) { + if (enrichedTimer.isGarbageCollection) { triggerRunner.prefetchIsClosed(directContext.state()); } else { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); @@ -744,7 +707,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } // For those windows that are active and open, prefetch the triggering or emitting state. - for (WindowActivation timer : windowActivations.values()) { + for (EnrichedTimerData timer : enrichedTimers) { if (timer.windowIsActiveAndOpen()) { ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; if (timer.isGarbageCollection) { @@ -757,27 +720,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } // Perform processing now that everything is prefetched. - for (WindowActivation windowActivation : windowActivations.values()) { - ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext; - ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext; + for (EnrichedTimerData timer : enrichedTimers) { + ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext; + ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext; - if (windowActivation.isGarbageCollection) { - WindowTracing.debug( - "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}", - ReduceFnRunner.class.getSimpleName(), - key, - directContext.window(), + if (timer.isGarbageCollection) { + WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with " + + "inputWatermark:{}; outputWatermark:{}", + key, directContext.window(), timer.timestamp, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen(); + boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen(); if (windowIsActiveAndOpen) { // We need to call onTrigger to emit the final pane if required. // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted, // and the watermark has passed the end of the window. @Nullable Instant newHold = onTrigger( - directContext, renamedContext, true /* isFinished */, windowActivation.isEndOfWindow); + directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow); checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold); } @@ -785,20 +746,18 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // see elements for it again. clearAllState(directContext, renamedContext, windowIsActiveAndOpen); } else { - WindowTracing.debug( - "{}.onTimers: Triggering for key:{}; window:{} at {} with " + WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with " + "inputWatermark:{}; outputWatermark:{}", - key, - directContext.window(), + key, directContext.window(), timer.timestamp, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); - if (windowActivation.windowIsActiveAndOpen() + if (timer.windowIsActiveAndOpen() && triggerRunner.shouldFire( directContext.window(), directContext.timers(), directContext.state())) { emit(directContext, renamedContext); } - if (windowActivation.isEndOfWindow) { + if (timer.isEndOfWindow) { // If the window strategy trigger includes a watermark trigger then at this point // there should be no data holds, either because we'd already cleared them on an // earlier onTrigger, or because we just cleared them on the above emit. @@ -960,9 +919,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // The pane has elements. return true; } - if (timing == Timing.ON_TIME - && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_ALWAYS) { - // This is an empty ON_TIME pane. + if (timing == Timing.ON_TIME) { + // This is the unique ON_TIME pane. return true; } if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) { @@ -990,8 +948,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private Instant onTrigger( final ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext, - final boolean isFinished, boolean isEndOfWindow) + boolean isFinished, boolean isEndOfWindow) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + + // Calculate the pane info. + final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); + // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = watermarkHold.extractAndRelease(renamedContext, isFinished).read(); @@ -1000,13 +963,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @Nullable Instant newHold = pair.newHold; final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read(); - if (isEmpty - && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY - && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) { - return newHold; - } - Instant inputWM = timerInternals.currentInputWatermarkTime(); if (newHold != null) { // We can't be finished yet. checkState( @@ -1038,9 +995,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } - // Calculate the pane info. - final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); - // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmpty, isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method. @@ -1051,11 +1005,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @Override public void output(OutputT toOutput) { // We're going to output panes, so commit the (now used) PaneInfo. - // This is unnecessary if the trigger isFinished since the saved + // TODO: This is unnecessary if the trigger isFinished since the saved // state will be immediately deleted. - if (!isFinished) { - paneInfoTracker.storeCurrentPaneInfo(directContext, pane); - } + paneInfoTracker.storeCurrentPaneInfo(directContext, pane); // Output the actual value. outputter.outputWindowedValue( @@ -1129,9 +1081,4 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } - private boolean windowIsExpired(BoundedWindow w) { - return timerInternals - .currentInputWatermarkTime() - .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness())); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index c3bfef6..7d7babd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -233,11 +233,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { return this; } @@ -303,11 +298,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access StartBundleContext outside of @StartBundle method."); @@ -477,11 +467,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } @@ -583,11 +568,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } - - @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 6e97645..c4b086a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.runners.core.construction.ReplacementOutputs; @@ -85,15 +86,15 @@ public class SplittableParDoViaKeyedWorkItems { /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */ public static class OverrideFactory<InputT, OutputT, RestrictionT> implements PTransformOverrideFactory< - PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple, - ProcessKeyedElements<InputT, OutputT, RestrictionT>> { + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple, + ProcessKeyedElements<InputT, OutputT, RestrictionT>> { @Override public PTransformReplacement< - PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple> + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple, - ProcessKeyedElements<InputT, OutputT, RestrictionT>> + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, + PCollectionTuple, ProcessKeyedElements<InputT, OutputT, RestrictionT>> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), @@ -112,7 +113,8 @@ public class SplittableParDoViaKeyedWorkItems { * method for a splittable {@link DoFn}. */ public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT, RestrictionT> - extends PTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple> { + extends PTransform< + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> { private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original; public SplittableProcessViaKeyedWorkItems( @@ -121,13 +123,15 @@ public class SplittableParDoViaKeyedWorkItems { } @Override - public PCollectionTuple expand(PCollection<KV<String, KV<InputT, RestrictionT>>> input) { + public PCollectionTuple expand( + PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> input) { return input - .apply(new GBKIntoKeyedWorkItems<String, KV<InputT, RestrictionT>>()) + .apply(new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>()) .setCoder( KeyedWorkItemCoder.of( StringUtf8Coder.of(), - ((KvCoder<String, KV<InputT, RestrictionT>>) input.getCoder()).getValueCoder(), + ((KvCoder<String, ElementAndRestriction<InputT, RestrictionT>>) input.getCoder()) + .getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) .apply(new ProcessElements<>(original)); } @@ -137,7 +141,8 @@ public class SplittableParDoViaKeyedWorkItems { public static class ProcessElements< InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> extends PTransform< - PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>, PCollectionTuple> { + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>, + PCollectionTuple> { private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original; public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT> original) { @@ -171,7 +176,7 @@ public class SplittableParDoViaKeyedWorkItems { @Override public PCollectionTuple expand( - PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>> input) { + PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> input) { return ProcessKeyedElements.createPrimitiveOutputFor( input, original.getFn(), @@ -196,12 +201,12 @@ public class SplittableParDoViaKeyedWorkItems { @VisibleForTesting public static class ProcessFn< InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> - extends DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> { + extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { /** * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction, - * and is released when the {@link DoFn.ProcessElement} call returns {@link - * ProcessContinuation#stop()}. + * and is released when the {@link DoFn.ProcessElement} call returns and there is no residual + * restriction captured by the {@link SplittableProcessElementInvoker}. * * <p>A hold is needed to avoid letting the output watermark immediately progress together with * the input watermark when the first {@link DoFn.ProcessElement} call for this element @@ -316,7 +321,7 @@ public class SplittableParDoViaKeyedWorkItems { boolean isSeedCall = (timer == null); StateNamespace stateNamespace; if (isSeedCall) { - WindowedValue<KV<InputT, RestrictionT>> windowedValue = + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = Iterables.getOnlyElement(c.element().elementsIterable()); BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows()); stateNamespace = @@ -332,25 +337,27 @@ public class SplittableParDoViaKeyedWorkItems { stateInternals.state(stateNamespace, restrictionTag); WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag); - KV<WindowedValue<InputT>, RestrictionT> elementAndRestriction; + ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; if (isSeedCall) { - WindowedValue<KV<InputT, RestrictionT>> windowedValue = + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = Iterables.getOnlyElement(c.element().elementsIterable()); - WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().getKey()); + WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element()); elementState.write(element); - elementAndRestriction = KV.of(element, windowedValue.getValue().getValue()); + elementAndRestriction = + ElementAndRestriction.of(element, windowedValue.getValue().restriction()); } else { // This is not the first ProcessElement call for this element/restriction - rather, // this is a timer firing, so we need to fetch the element and restriction from state. elementState.readLater(); restrictionState.readLater(); - elementAndRestriction = KV.of(elementState.read(), restrictionState.read()); + elementAndRestriction = + ElementAndRestriction.of(elementState.read(), restrictionState.read()); } - final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.getValue()); + final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.restriction()); SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, TrackerT>.Result result = processElementInvoker.invokeProcessElement( - invoker, elementAndRestriction.getKey(), tracker); + invoker, elementAndRestriction.element(), tracker); // Save state for resuming. if (result.getResidualRestriction() == null) { @@ -363,14 +370,13 @@ public class SplittableParDoViaKeyedWorkItems { restrictionState.write(result.getResidualRestriction()); Instant futureOutputWatermark = result.getFutureOutputWatermark(); if (futureOutputWatermark == null) { - futureOutputWatermark = elementAndRestriction.getKey().getTimestamp(); + futureOutputWatermark = elementAndRestriction.element().getTimestamp(); } - Instant wakeupTime = - timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay()); holdState.add(futureOutputWatermark); // Set a timer to continue processing this element. timerInternals.setTimer( - TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME)); + TimerInternals.TimerData.of( + stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME)); } private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle( http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 7732df3..ced6c01 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkNotNull; - import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -36,35 +34,20 @@ public abstract class SplittableProcessElementInvoker< public class Result { @Nullable private final RestrictionT residualRestriction; - private final DoFn.ProcessContinuation continuation; private final Instant futureOutputWatermark; public Result( - @Nullable RestrictionT residualRestriction, - DoFn.ProcessContinuation continuation, - Instant futureOutputWatermark) { - this.continuation = checkNotNull(continuation); - if (continuation.shouldResume()) { - checkNotNull(residualRestriction); - } + @Nullable RestrictionT residualRestriction, Instant futureOutputWatermark) { this.residualRestriction = residualRestriction; this.futureOutputWatermark = futureOutputWatermark; } - /** - * Can be {@code null} only if {@link #getContinuation} specifies the call should not resume. - * However, the converse is not true: this can be non-null even if {@link #getContinuation} - * is {@link DoFn.ProcessContinuation#stop()}. - */ + /** If {@code null}, means the call should not resume. */ @Nullable public RestrictionT getResidualRestriction() { return residualRestriction; } - public DoFn.ProcessContinuation getContinuation() { - return continuation; - } - public Instant getFutureOutputWatermark() { return futureOutputWatermark; } @@ -74,8 +57,8 @@ public abstract class SplittableProcessElementInvoker< * Invokes the {@link DoFn.ProcessElement} method using the given {@link DoFnInvoker} for the * original {@link DoFn}, on the given element and with the given {@link RestrictionTracker}. * - * @return Information on how to resume the call: residual restriction, a {@link - * DoFn.ProcessContinuation}, and a future output watermark. + * @return Information on how to resume the call: residual restriction and a + * future output watermark. */ public abstract Result invokeProcessElement( DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, TrackerT tracker); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index 3144bd6..c189b0d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; -import com.google.common.annotations.VisibleForTesting; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.BagState; import org.apache.beam.sdk.state.CombiningState; @@ -104,11 +103,6 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound this.bufferTag = bufferTag; } - @VisibleForTesting - StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() { - return bufferTag; - } - @Override public void processValue(ProcessValueContext c) throws Exception { c.state().access(bufferTag).add(c.value()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java index 3530ed1..0f0c17c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -23,6 +23,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; /** @@ -30,7 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental; * have fired. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterAllStateMachine extends TriggerStateMachine { +public class AfterAllStateMachine extends OnceTriggerStateMachine { private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) { super(subTriggers); @@ -41,11 +42,11 @@ public class AfterAllStateMachine extends TriggerStateMachine { * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static TriggerStateMachine of(TriggerStateMachine... triggers) { + public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) { return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); } - public static TriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) { + public static OnceTriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) { return new AfterAllStateMachine(ImmutableList.copyOf(triggers)); } @@ -77,21 +78,24 @@ public class AfterAllStateMachine extends TriggerStateMachine { */ @Override public boolean shouldFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { - if (!context.forTrigger(subTrigger).trigger().isFinished() - && !subTrigger.invokeShouldFire(context)) { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + if (!context.forTrigger(subtrigger).trigger().isFinished() + && !subtrigger.invokeShouldFire(context)) { return false; } } return true; } + /** + * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} + * because they all must be ready to fire. + */ @Override - public void onFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { - subTrigger.invokeOnFire(context); + public void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + subtrigger.invokeOnFire(context); } - context.trigger().setFinished(true); } @Override @@ -99,6 +103,7 @@ public class AfterAllStateMachine extends TriggerStateMachine { StringBuilder builder = new StringBuilder("AfterAll.of("); Joiner.on(", ").appendTo(builder, subTriggers); builder.append(")"); + return builder.toString(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 06c2066..8d8d0de 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.state.CombiningState; @@ -49,7 +50,7 @@ import org.joda.time.format.PeriodFormatter; // This class should be inlined to subclasses and deleted, simplifying them too // https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStateMachine { +public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine { protected static final List<SerializableFunction<Instant, Instant>> IDENTITY = ImmutableList.<SerializableFunction<Instant, Instant>>of(); @@ -236,9 +237,8 @@ public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStat } @Override - public final void onFire(TriggerContext context) throws Exception { + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { clear(context); - context.trigger().setFinished(true); } protected Instant computeTargetTimestamp(Instant time) { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java index 58c24c5..840a65c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -23,6 +23,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; /** @@ -30,7 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental; * sub-triggers have fired. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterFirstStateMachine extends TriggerStateMachine { +public class AfterFirstStateMachine extends OnceTriggerStateMachine { AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) { super(subTriggers); @@ -41,12 +42,12 @@ public class AfterFirstStateMachine extends TriggerStateMachine { * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static TriggerStateMachine of( + public static OnceTriggerStateMachine of( TriggerStateMachine... triggers) { return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers)); } - public static TriggerStateMachine of( + public static OnceTriggerStateMachine of( Iterable<? extends TriggerStateMachine> triggers) { return new AfterFirstStateMachine(ImmutableList.copyOf(triggers)); } @@ -78,19 +79,18 @@ public class AfterFirstStateMachine extends TriggerStateMachine { } @Override - public void onFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { - TriggerContext subContext = context.forTrigger(subTrigger); - if (subTrigger.invokeShouldFire(subContext)) { + protected void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { + TriggerContext subContext = context.forTrigger(subtrigger); + if (subtrigger.invokeShouldFire(subContext)) { // If the trigger is ready to fire, then do whatever it needs to do. - subTrigger.invokeOnFire(subContext); + subtrigger.invokeOnFire(subContext); } else { // If the trigger is not ready to fire, it is nonetheless true that whatever // pending pane it was tracking is now gone. - subTrigger.invokeClear(subContext); + subtrigger.invokeClear(subContext); } } - context.trigger().setFinished(true); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index 1ce035a..b9fbac3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.state.CombiningState; @@ -32,7 +33,7 @@ import org.apache.beam.sdk.transforms.Sum; * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterPaneStateMachine extends TriggerStateMachine { +public class AfterPaneStateMachine extends OnceTriggerStateMachine { private static final StateTag<CombiningState<Long, long[], Long>> ELEMENTS_IN_PANE_TAG = @@ -129,8 +130,7 @@ private static final StateTag<CombiningState<Long, long[], Long>> } @Override - public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { clear(context); - context.trigger().setFinished(true); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 509c96b..c9eee15 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableList; import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.state.TimeDomain; @@ -241,7 +242,7 @@ public class AfterWatermarkStateMachine { /** * A watermark trigger targeted relative to the end of the window. */ - public static class FromEndOfWindow extends TriggerStateMachine { + public static class FromEndOfWindow extends OnceTriggerStateMachine { private FromEndOfWindow() { super(null); @@ -318,8 +319,6 @@ public class AfterWatermarkStateMachine { } @Override - public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { - context.trigger().setFinished(true); - } + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java index cdcff64..c4d89c2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; /** @@ -45,14 +46,17 @@ public class ExecutableTriggerStateMachine implements Serializable { private static <W extends BoundedWindow> ExecutableTriggerStateMachine create( TriggerStateMachine trigger, int nextUnusedIndex) { - + if (trigger instanceof OnceTriggerStateMachine) { + return new ExecutableOnceTriggerStateMachine( + (OnceTriggerStateMachine) trigger, nextUnusedIndex); + } else { return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex); - + } } public static <W extends BoundedWindow> ExecutableTriggerStateMachine createForOnceTrigger( - TriggerStateMachine trigger, int nextUnusedIndex) { - return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex); + OnceTriggerStateMachine trigger, int nextUnusedIndex) { + return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex); } private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int nextUnusedIndex) { @@ -142,4 +146,15 @@ public class ExecutableTriggerStateMachine implements Serializable { public void invokeClear(TriggerStateMachine.TriggerContext c) throws Exception { trigger.clear(c.forTrigger(this)); } + + /** + * {@link ExecutableTriggerStateMachine} that enforces the fact that the trigger should always + * FIRE_AND_FINISH and never just FIRE. + */ + private static class ExecutableOnceTriggerStateMachine extends ExecutableTriggerStateMachine { + + public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, int nextUnusedIndex) { + super(trigger, nextUnusedIndex); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java index f8c5e8b..f32c7a8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.triggers; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -26,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; * <p>Using this trigger will only produce output when the watermark passes the end of the * {@link BoundedWindow window} plus the allowed lateness. */ -public final class NeverStateMachine extends TriggerStateMachine { +public final class NeverStateMachine extends OnceTriggerStateMachine { /** * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} * when the {@link BoundedWindow} closes. @@ -52,7 +53,7 @@ public final class NeverStateMachine extends TriggerStateMachine { } @Override - public void onFire(TriggerStateMachine.TriggerContext context) { + protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) { throw new UnsupportedOperationException( String.format("%s should never fire", getClass().getSimpleName())); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java index 880aa48..6a2cf0c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java @@ -453,8 +453,35 @@ public abstract class TriggerStateMachine implements Serializable { * } * </pre> * + * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is + * the same as {@code AfterFirst.of(t1, t2)}. */ public TriggerStateMachine orFinally(TriggerStateMachine until) { return new OrFinallyStateMachine(this, until); } + + /** + * {@link TriggerStateMachine}s that are guaranteed to fire at most once should extend from this, + * rather than the general {@link TriggerStateMachine} class to indicate that behavior. + */ + public abstract static class OnceTriggerStateMachine extends TriggerStateMachine { + protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) { + super(subTriggers); + } + + /** + * {@inheritDoc} + */ + @Override + public final void onFire(TriggerContext context) throws Exception { + onOnlyFiring(context); + context.trigger().setFinished(true); + } + + /** + * Called exactly once by {@link #onFire} when the trigger is fired. By default, + * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. + */ + protected abstract void onOnlyFiring(TriggerContext context) throws Exception; + } }
