Removes OldDoFn and its kin from runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6127f532 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6127f532 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6127f532 Branch: refs/heads/jstorm-runner Commit: 6127f532b7e6ad0f13926d3d9aec17eb538108ed Parents: 00b4a30 Author: Eugene Kirpichov <[email protected]> Authored: Fri May 12 11:18:57 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue May 16 11:55:43 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/AssignWindowsDoFn.java | 78 -- .../apache/beam/runners/core/DoFnAdapters.java | 381 ---------- .../apache/beam/runners/core/DoFnRunners.java | 27 +- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 113 --- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 94 --- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 6 +- .../core/GroupAlsoByWindowsAggregators.java | 26 + .../runners/core/GroupAlsoByWindowsDoFn.java | 39 - .../core/LateDataDroppingDoFnRunner.java | 3 +- .../org/apache/beam/runners/core/OldDoFn.java | 335 --------- .../beam/runners/core/SimpleOldDoFnRunner.java | 499 ------------- .../core/WindowingInternalsAdapters.java | 74 -- ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 109 --- .../core/GroupAlsoByWindowsProperties.java | 744 ------------------- .../apache/beam/runners/core/NoOpOldDoFn.java | 65 -- .../apache/beam/runners/core/OldDoFnTest.java | 51 -- .../runners/core/SimpleOldDoFnRunnerTest.java | 86 --- .../GroupAlsoByWindowEvaluatorFactory.java | 6 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 9 +- .../spark/translation/SparkAssignWindowFn.java | 4 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 4 +- 21 files changed, 43 insertions(+), 2710 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java deleted file mode 100644 index bbf3574..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; - -/** - * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the - * provided {@link WindowFn}. - * - * @param <T> Type of elements being windowed - * @param <W> Window type - */ -@SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T> - implements RequiresWindowAccess { - private WindowFn<? super T, W> fn; - - public AssignWindowsDoFn(WindowFn<? super T, W> fn) { - this.fn = - checkNotNull( - fn, - "%s provided to %s cannot be null", - WindowFn.class.getSimpleName(), - AssignWindowsDoFn.class.getSimpleName()); - } - - @Override - @SuppressWarnings("unchecked") - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = - ((WindowFn<T, W>) fn).assignWindows( - ((WindowFn<T, W>) fn).new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(c.windowingInternals().windows()); - } - }); - - c.windowingInternals() - .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java deleted file mode 100644 index af59a40..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.io.IOException; -import org.apache.beam.runners.core.OldDoFn.Context; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.state.State; -import org.apache.beam.sdk.state.Timer; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; -import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.DoFn.StartBundleContext; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. - * - * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link - * DoFnInvoker}) rather than via {@link OldDoFn}. - */ -@Deprecated -public class DoFnAdapters { - /** Should not be instantiated. */ - private DoFnAdapters() {} - - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) { - DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); - if (signature.processElement().observesWindow()) { - return new WindowDoFnAdapter<>(fn); - } else { - return new SimpleDoFnAdapter<>(fn); - } - } - - /** - * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link - * OldDoFn}. - */ - private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { - private final DoFn<InputT, OutputT> fn; - private transient DoFnInvoker<InputT, OutputT> invoker; - - SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { - this.fn = fn; - this.invoker = DoFnInvokers.invokerFor(fn); - } - - @Override - public void setup() throws Exception { - this.invoker.invokeSetup(); - } - - @Override - public void startBundle(Context c) throws Exception { - fn.prepareForProcessing(); - invoker.invokeStartBundle(new StartBundleContextAdapter<>(fn, c)); - } - - @Override - public void finishBundle(Context c) throws Exception { - invoker.invokeFinishBundle(new FinishBundleContextAdapter<>(fn, c)); - } - - @Override - public void teardown() throws Exception { - this.invoker.invokeTeardown(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); - invoker.invokeProcessElement(adapter); - } - - @Override - public Duration getAllowedTimestampSkew() { - return fn.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(fn); - } - - private void readObject(java.io.ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.invoker = DoFnInvokers.invokerFor(fn); - } - } - - /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ - private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> - implements OldDoFn.RequiresWindowAccess { - - WindowDoFnAdapter(DoFn<InputT, OutputT> fn) { - super(fn); - } - } - - /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link - * DoFn.StartBundle} method, which means the extra context is unavailable. - */ - private static class StartBundleContextAdapter<InputT, OutputT> - extends DoFn<InputT, OutputT>.StartBundleContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - private OldDoFn<InputT, OutputT>.Context context; - - private StartBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) { - fn.super(); - this.context = context; - } - - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public BoundedWindow window() { - // The OldDoFn doesn't allow us to ask for these outside processElement, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the window in processElement; elsewhere there is no defined window."); - } - - @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a FinishBundleContext in finishBundle"); - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a ProcessContext in processElement"); - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Timers are not supported for OldDoFn"); - } - - @Override - public RestrictionTracker<?> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } - - /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link - * DoFn.FinishBundle} method, which means the extra context is unavailable. - */ - private static class FinishBundleContextAdapter<InputT, OutputT> - extends DoFn<InputT, OutputT>.FinishBundleContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.Context context; - - private FinishBundleContextAdapter(DoFn<InputT, OutputT> fn, Context context) { - fn.super(); - this.context = context; - } - - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public BoundedWindow window() { - // The OldDoFn doesn't allow us to ask for these outside processElement, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the window in processElement; elsewhere there is no defined window."); - } - - @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a StartBundleContext in startBundle"); - } - - @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a ProcessContext in processElement"); - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Timers are not supported for OldDoFn"); - } - - @Override - public RestrictionTracker<?> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - - @Override - public void output( - OutputT output, Instant timestamp, BoundedWindow window) { - // Not full fidelity conversion. This should be removed as soon as possible. - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void output( - TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - // Not full fidelity conversion. This should be removed as soon as possible. - context.outputWithTimestamp(tag, output, timestamp); - } - } - - /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. - */ - private static class ProcessContextAdapter<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.ProcessContext context; - - private ProcessContextAdapter( - DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { - fn.super(); - this.context = context; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return context.sideInput(view); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void output(TupleTag<T> tag, T output) { - context.output(tag, output); - } - - @Override - public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, timestamp); - } - - @Override - public InputT element() { - return context.element(); - } - - @Override - public Instant timestamp() { - return context.timestamp(); - } - - @Override - public PaneInfo pane() { - return context.pane(); - } - - @Override - public void updateWatermark(Instant watermark) { - throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()"); - } - - @Override - public BoundedWindow window() { - return context.window(); - } - - @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { - return null; - } - - @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { - return null; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); - } - - @Override - public RestrictionTracker<?> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index ee3aefa..f3cca6f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -74,34 +74,9 @@ public class DoFnRunners { } /** - * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}. - * - * <p>It invokes {@link OldDoFn#processElement} for each input. - */ - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( - PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - StepContext stepContext, - WindowingStrategy<?, ?> windowingStrategy) { - return new SimpleOldDoFnRunner<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - windowingStrategy); - } - - /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * - * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. + * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}. */ public static <K, InputT, OutputT, W extends BoundedWindow> DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java deleted file mode 100644 index a160553..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.runners.core.construction.Triggers; -import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; -import org.apache.beam.runners.core.triggers.TriggerStateMachines; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Instant; - -/** - * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path" - * implementation is applicable. - */ -@SystemDoFnInternal -public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> { - - private final WindowingStrategy<?, W> strategy; - private final StateInternalsFactory<K> stateInternalsFactory; - private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; - - public GroupAlsoByWindowViaOutputBufferDoFn( - WindowingStrategy<?, W> windowingStrategy, - StateInternalsFactory<K> stateInternalsFactory, - SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { - this.strategy = windowingStrategy; - this.reduceFn = reduceFn; - this.stateInternalsFactory = stateInternalsFactory; - } - - @Override - public void processElement(ProcessContext c) throws Exception { - K key = c.element().getKey(); - // Used with Batch, we know that all the data is available for this key. We can't use the - // timer manager from the context because it doesn't exist. So we create one and emulate the - // watermark, knowing that we have all data and it is in timestamp order. - InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); - timerInternals.advanceProcessingTime(Instant.now()); - timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); - - ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = - new ReduceFnRunner<>( - key, - strategy, - ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(strategy.getTrigger()))), - stateInternals, - timerInternals, - WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), - WindowingInternalsAdapters.sideInputReader(c.windowingInternals()), - reduceFn, - c.getPipelineOptions()); - - // Process the elements. - reduceFnRunner.processElements(c.element().getValue()); - - // Finish any pending windows by advancing the input watermark to infinity. - timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - - // Finally, advance the processing time to infinity to fire any timers. - timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); - timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); - - fireEligibleTimers(timerInternals, reduceFnRunner); - - reduceFnRunner.persist(); - } - - private void fireEligibleTimers( - InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) - throws Exception { - List<TimerInternals.TimerData> timers = new ArrayList<>(); - while (true) { - TimerInternals.TimerData timer; - while ((timer = timerInternals.removeNextEventTimer()) != null) { - timers.add(timer); - } - while ((timer = timerInternals.removeNextProcessingTimer()) != null) { - timers.add(timer); - } - while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { - timers.add(timer); - } - if (timers.isEmpty()) { - break; - } - reduceFnRunner.onTimers(timers); - timers.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java deleted file mode 100644 index 2342c52..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.runners.core.construction.Triggers; -import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; -import org.apache.beam.runners.core.triggers.TriggerStateMachines; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the - * {@link ReduceFnRunner}. - */ -@SystemDoFnInternal -public class GroupAlsoByWindowViaWindowSetDoFn< - K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>> - extends OldDoFn<RinT, KV<K, OutputT>> { - - public static <K, InputT, OutputT, W extends BoundedWindow> - OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create( - WindowingStrategy<?, W> strategy, - StateInternalsFactory<K> stateInternalsFactory, - SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { - return new GroupAlsoByWindowViaWindowSetDoFn<>(strategy, stateInternalsFactory, reduceFn); - } - - private final WindowingStrategy<Object, W> windowingStrategy; - private final StateInternalsFactory<K> stateInternalsFactory; - private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; - - private GroupAlsoByWindowViaWindowSetDoFn( - WindowingStrategy<?, W> windowingStrategy, - StateInternalsFactory<K> stateInternalsFactory, - SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { - @SuppressWarnings("unchecked") - WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy; - this.windowingStrategy = noWildcard; - this.reduceFn = reduceFn; - this.stateInternalsFactory = stateInternalsFactory; - } - - @Override - public void processElement(ProcessContext c) throws Exception { - KeyedWorkItem<K, InputT> keyedWorkItem = c.element(); - - K key = keyedWorkItem.key(); - TimerInternals timerInternals = c.windowingInternals().timerInternals(); - StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); - - ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = - new ReduceFnRunner<>( - key, - windowingStrategy, - ExecutableTriggerStateMachine.create( - TriggerStateMachines.stateMachineForTrigger( - Triggers.toProto(windowingStrategy.getTrigger()))), - stateInternals, - timerInternals, - WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()), - WindowingInternalsAdapters.sideInputReader(c.windowingInternals()), - reduceFn, - c.getPipelineOptions()); - - reduceFnRunner.processElements(keyedWorkItem.elementsIterable()); - reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); - reduceFnRunner.persist(); - } - - public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { - // Safe contravariant cast - @SuppressWarnings("unchecked") - OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asFn = - (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) this; - return asFn; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 5b82d1f..744d162 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; /** - * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the * {@link ReduceFnRunner}. */ @SystemDoFnInternal @@ -134,8 +134,4 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< reduceFnRunner.onTimers(keyedWorkItem.timersIterable()); reduceFnRunner.persist(); } - - public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { - throw new RuntimeException("Not implement!"); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java new file mode 100644 index 0000000..8d96257 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +/** + * Common aggregator names for {@link GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow}. + */ +public abstract class GroupAlsoByWindowsAggregators { + public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; + public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java deleted file mode 100644 index 2bd9ee0..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -/** - * {@link OldDoFn} that merges windows and groups elements in those windows, optionally - * combining values. - * - * @param <K> key type - * @param <InputT> input value element type - * @param <OutputT> output value element type - * @param <W> window type - */ -@SystemDoFnInternal -public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { - public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; - public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 570f524..1cf1509 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; @@ -33,7 +34,7 @@ import org.joda.time.Instant; /** * A customized {@link DoFnRunner} that handles late data dropping for - * a {@link KeyedWorkItem} input {@link OldDoFn}. + * a {@link KeyedWorkItem} input {@link DoFn}. * * <p>It expands windows before checking data lateness. * http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java deleted file mode 100644 index 41bb598..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.io.Serializable; -import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * The argument to {@link ParDo} providing the code to use to process - * elements of the input - * {@link org.apache.beam.sdk.values.PCollection}. - * - * <p>See {@link ParDo} for more explanation, examples of use, and - * discussion of constraints on {@code OldDoFn}s, including their - * serializability, lack of access to global shared mutable state, - * requirements for failure tolerance, and benefits of optimization. - * - * <p>{@code OldDoFn}s can be tested in the context of a particular - * {@code Pipeline} by running that {@code Pipeline} on sample input - * and then checking its output. Unit testing of a {@code OldDoFn}, - * separately from any {@code ParDo} transform or {@code Pipeline}, - * can be done via the {@link DoFnTester} harness. - * - * <p>{@link DoFn} (currently experimental) offers an alternative - * mechanism for accessing {@link ProcessContext#window()} without the need - * to implement {@link RequiresWindowAccess}. - * - * <p>See also {@link #processElement} for details on implementing the transformation - * from {@code InputT} to {@code OutputT}. - * - * @param <InputT> the type of the (main) input elements - * @param <OutputT> the type of the (main) output elements - * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}. - */ -@Deprecated -public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData { - /** - * Information accessible to all methods in this {@code OldDoFn}. - * Used primarily to output elements. - */ - public abstract class Context { - - /** - * Returns the {@code PipelineOptions} specified with the - * {@link PipelineRunner} - * invoking this {@code OldDoFn}. The {@code PipelineOptions} will - * be the default running via {@link DoFnTester}. - */ - public abstract PipelineOptions getPipelineOptions(); - - /** - * Adds the given element to the main output {@code PCollection}. - * - * <p>Once passed to {@code output} the element should be considered - * immutable and not be modified in any way. It may be cached or retained - * by a Beam runner or later steps in the pipeline, or used in - * other unspecified ways. - * - * <p>If invoked from {@link OldDoFn#processElement processElement}, the output - * element will have the same timestamp and be in the same windows - * as the input element passed to {@link OldDoFn#processElement processElement}. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - */ - public abstract void output(OutputT output); - - /** - * Adds the given element to the main output {@code PCollection}, - * with the given timestamp. - * - * <p>Once passed to {@code outputWithTimestamp} the element should not be - * modified in any way. - * - * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp - * must not be older than the input element's timestamp minus - * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. - */ - public abstract void outputWithTimestamp(OutputT output, Instant timestamp); - - /** - * Adds the given element to the output {@code PCollection} with the - * given tag. - * - * <p>Once passed to {@code output} the element should not be modified - * in any way. - * - * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags} - * to specify the tags of outputs that it consumes. Outputs that are not consumed, e.g., outputs - * for monitoring purposes only, don't necessarily need to be specified. - * - * <p>The output element will have the same timestamp and be in the same - * windows as the input element passed to {@link OldDoFn#processElement processElement}. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - * - * @see ParDo.SingleOutput#withOutputTags - */ - public abstract <T> void output(TupleTag<T> tag, T output); - - /** - * Adds the given element to the specified output {@code PCollection}, with the given timestamp. - * - * <p>Once passed to {@code outputWithTimestamp} the element should not be modified in any way. - * - * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp must not be - * older than the input element's timestamp minus {@link OldDoFn#getAllowedTimestampSkew - * getAllowedTimestampSkew}. The output element will be in the same windows as the input - * element. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the - * input {@code PCollection} to determine what windows the element should be in, throwing an - * exception if the {@code WindowFn} attempts to access any information about the input element - * except for the timestamp. - * - * @see ParDo.SingleOutput#withOutputTags - */ - public abstract <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp); - } - - /** - * Information accessible when running {@link OldDoFn#processElement}. - */ - public abstract class ProcessContext extends Context { - - /** - * Returns the input element to be processed. - * - * <p>The element should be considered immutable. A Beam runner will not mutate the - * element, so it is safe to cache, etc. The element should not be mutated by any of the - * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner - * runtime, or used in other unspecified ways. - */ - public abstract InputT element(); - - /** - * Returns the value of the side input for the window corresponding to the - * window of the main input element. - * - * <p>See - * {@link WindowMappingFn#getSideInputWindow} - * for how this corresponding window is determined. - * - * @throws IllegalArgumentException if this is not a side input - * @see ParDo.SingleOutput#withSideInputs - */ - public abstract <T> T sideInput(PCollectionView<T> view); - - /** - * Returns the timestamp of the input element. - * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract Instant timestamp(); - - /** - * Returns the window into which the input element has been assigned. - * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - * - * @throws UnsupportedOperationException if this {@link OldDoFn} does - * not implement {@link RequiresWindowAccess}. - */ - public abstract BoundedWindow window(); - - /** - * Returns information about the pane within this window into which the - * input element has been assigned. - * - * <p>Generally all data is in a single, uninteresting pane unless custom - * triggering and/or late data has been explicitly requested. - * See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract PaneInfo pane(); - - /** - * Returns the process context to use for implementing windowing. - */ - @Experimental - public abstract WindowingInternals<InputT, OutputT> windowingInternals(); - } - - /** - * Returns the allowed timestamp skew duration, which is the maximum - * duration that timestamps can be shifted backward in - * {@link OldDoFn.Context#outputWithTimestamp}. - * - * <p>The default value is {@code Duration.ZERO}, in which case - * timestamps can only be shifted forward to future. For infinite - * skew, return {@code Duration.millis(Long.MAX_VALUE)}. - * - * <p>Note that producing an element whose timestamp is less than the - * current timestamp may result in late data, i.e. returning a non-zero - * value here does not impact watermark calculations used for firing - * windows. - * - * @deprecated does not interact well with the watermark. - */ - @Deprecated - public Duration getAllowedTimestampSkew() { - return Duration.ZERO; - } - - /** - * Interface for signaling that a {@link OldDoFn} needs to access the window the - * element is being processed in, via {@link OldDoFn.ProcessContext#window}. - */ - @Experimental - public interface RequiresWindowAccess {} - - public OldDoFn() { - } - - /** - * Prepares this {@link DoFn} instance for processing bundles. - * - * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other - * {@link DoFn} method is called. - * - * <p>By default, does nothing. - */ - public void setup() throws Exception { - } - - /** - * Prepares this {@code OldDoFn} instance for processing a batch of elements. - * - * <p>By default, does nothing. - */ - public void startBundle(Context c) throws Exception { - } - - /** - * Processes one input element. - * - * <p>The current element of the input {@code PCollection} is returned by - * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam - * runner will not mutate the element, so it is safe to cache, etc. The element should not be - * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by - * the Beam runner, or used in other unspecified ways. - * - * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}. - * Once passed to {@code output} the element should be considered immutable and not be modified in - * any way. It may be cached elsewhere, retained by the Beam runner, or used in other - * unspecified ways. - * - * @see ProcessContext - */ - public abstract void processElement(ProcessContext c) throws Exception; - - /** - * Finishes processing this batch of elements. - * - * <p>By default, does nothing. - */ - public void finishBundle(Context c) throws Exception { - } - - /** - * Cleans up this {@link DoFn}. - * - * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn} - * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other - * {@link DoFn} methods will be called after a call to {@link #teardown()}. - * - * <p>By default, does nothing. - */ - public void teardown() throws Exception { - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java deleted file mode 100644 index 2a0b688..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; - -/** - * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in. - * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - */ -class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - - /** The {@link OldDoFn} being run. */ - private final OldDoFn<InputT, OutputT> fn; - /** The context used for running the {@link OldDoFn}. */ - private final DoFnContext<InputT, OutputT> context; - - public SimpleOldDoFnRunner( - PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - StepContext stepContext, - WindowingStrategy<?, ?> windowingStrategy) { - this.fn = fn; - this.context = new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - windowingStrategy == null ? null : windowingStrategy.getWindowFn()); - } - - @Override - public void startBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.startBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - if (elem.getWindows().size() <= 1 - || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) - && context.sideInputReader.isEmpty())) { - invokeProcessElement(elem); - } else { - // We could modify the windowed value (and the processContext) to - // avoid repeated allocations, but this is more straightforward. - for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) { - invokeProcessElement(windowedValue); - } - } - } - - @Override - public void onTimer(String timerId, BoundedWindow window, Instant timestamp, - TimeDomain timeDomain) { - throw new UnsupportedOperationException( - String.format("Timers are not supported by %s", OldDoFn.class.getSimpleName())); - } - - private void invokeProcessElement(WindowedValue<InputT> elem) { - final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem); - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.processElement(processContext); - } catch (Exception ex) { - throw wrapUserCodeException(ex); - } - } - - @Override - public void finishBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.finishBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - /** - * Returns a new {@link OldDoFn.ProcessContext} for the given element. - */ - private OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( - WindowedValue<InputT> elem) { - return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); - } - - private RuntimeException wrapUserCodeException(Throwable t) { - throw UserCodeException.wrapIf(!isSystemDoFn(), t); - } - - private boolean isSystemDoFn() { - return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); - } - - /** - * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. - * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - */ - private static class DoFnContext<InputT, OutputT> extends OldDoFn<InputT, OutputT>.Context { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final PipelineOptions options; - final OldDoFn<InputT, OutputT> fn; - final SideInputReader sideInputReader; - final OutputManager outputManager; - final TupleTag<OutputT> mainOutputTag; - final StepContext stepContext; - final WindowFn<?, ?> windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an - * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private Set<TupleTag<?>> outputTags; - - public DoFnContext(PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - StepContext stepContext, - WindowFn<?, ?> windowFn) { - fn.super(); - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag<?> additionalOutputTag : additionalOutputTags) { - outputTags.add(additionalOutputTag); - } - - this.stepContext = stepContext; - this.windowFn = windowFn; - } - - ////////////////////////////////////////////////////////////////////////////// - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; - windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public W window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue<OutputT> windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } - } - - private <T> void outputWindowedValue(TupleTag<T> tag, - T output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(tag, windowedElem); - } - } - - // Following implementations of output, outputWithTimestamp, and output - // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in OldDoFn.processElement. - @Override - public void output(OutputT output) { - outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void output(TupleTag<T> tag, T output) { - checkNotNull(tag, "TupleTag passed to output cannot be null"); - outputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to outputWithTimestamp cannot be null"); - outputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); - } - } - - /** - * A concrete implementation of {@link OldDoFn.ProcessContext} used for running a {@link OldDoFn} - * over a single element. - * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - */ - private static class DoFnProcessContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.ProcessContext { - - - final OldDoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; - final WindowedValue<InputT> windowedValue; - - public DoFnProcessContext(OldDoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue) { - fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public InputT element() { - return windowedValue.getValue(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator<? extends BoundedWindow> windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput( - view, view.getWindowMappingFn().getSideInputWindow(window)); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a OldDoFn marked as" - + "RequiresWindowAccess."); - } - return Iterables.getOnlyElement(windows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); - context.outputWindowedValue(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane()); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public <T> void output(TupleTag<T> tag, T output) { - checkNotNull(tag, "Tag passed to output cannot be null"); - context.outputWindowedValue(tag, windowedValue.withValue(output)); - } - - @Override - public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); - checkTimestamp(timestamp); - context.outputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - private void checkTimestamp(Instant timestamp) { - if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { - throw new IllegalArgumentException(String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", - timestamp, windowedValue.getTimestamp(), - PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); - } - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return new WindowingInternals<InputT, OutputT>() { - @Override - public void outputWindowedValue(OutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public <AdditionalOutputT> void outputWindowedValue( - TupleTag<AdditionalOutputT> tag, - AdditionalOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - context.outputWindowedValue(tag, output, timestamp, windows, pane); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public StateInternals stateInternals() { - return context.stepContext.stateInternals(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { - return context.sideInput(view, sideInputWindow); - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java deleted file mode 100644 index 4a58445..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import java.util.Collection; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * Adapters from {@link WindowingInternals} to {@link SideInputReader} and {@link - * OutputWindowedValue}. - */ -public class WindowingInternalsAdapters { - static SideInputReader sideInputReader(final WindowingInternals<?, ?> windowingInternals) { - return new SideInputReader() { - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow) { - return windowingInternals.sideInput(view, sideInputWindow); - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException(); - } - }; - } - - public static <OutputT> OutputWindowedValue<OutputT> outputWindowedValue( - final WindowingInternals<?, OutputT> windowingInternals) { - return new OutputWindowedValue<OutputT>() { - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - windowingInternals.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public <AdditionalOutputT> void outputWindowedValue( - TupleTag<AdditionalOutputT> tag, - AdditionalOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - windowingInternals.outputWindowedValue(tag, output, timestamp, windows, pane); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java deleted file mode 100644 index a265ead..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link GroupAlsoByWindowViaOutputBufferDoFn}. - */ -@RunWith(JUnit4.class) -public class GroupAlsoByWindowViaOutputBufferDoFnTest { - - private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT> - implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> { - - private final Coder<InputT> inputCoder; - - public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) { - this.inputCoder = inputCoder; - } - - @Override - public <W extends BoundedWindow> - GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy( - WindowingStrategy<?, W> windowingStrategy, - StateInternalsFactory<K> stateInternalsFactory) { - return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>( - windowingStrategy, - stateInternalsFactory, - SystemReduceFn.<K, InputT, W>buffering(inputCoder)); - } - } - - @Test - public void testEmptyInputEmptyOutput() throws Exception { - GroupAlsoByWindowsProperties.emptyInputEmptyOutput( - new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSlidingWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsIntoOverlappingNonmergingWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsIntoSessions() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessions( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } -}
