Repository: beam Updated Branches: refs/heads/master 8f4bb0158 -> 4124cc687
Revert "Removes final minor usages of OldDoFn outside OldDoFn itself" This reverts commit a3b5f968c1ae2e4f712bfcf200a03d8d193fd90c. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d357aea7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d357aea7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d357aea7 Branch: refs/heads/master Commit: d357aea77a9e7051fba3b5aead3c0782d9be6274 Parents: 8f4bb01 Author: Eugene Kirpichov <[email protected]> Authored: Tue Apr 18 11:37:36 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 11:37:36 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/AssignWindowsDoFn.java | 78 +++++ .../apache/beam/runners/core/DoFnAdapters.java | 328 +++++++++++++++++++ .../apache/beam/runners/core/DoFnRunners.java | 2 +- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 17 +- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 7 +- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 11 +- .../core/GroupAlsoByWindowsAggregators.java | 28 -- .../runners/core/GroupAlsoByWindowsDoFn.java | 46 +++ .../core/LateDataDroppingDoFnRunner.java | 3 +- ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 4 +- .../core/GroupAlsoByWindowsProperties.java | 27 +- .../beam/runners/core/ReduceFnTester.java | 3 +- .../GroupAlsoByWindowEvaluatorFactory.java | 6 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 9 +- .../spark/translation/SparkAssignWindowFn.java | 3 +- ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 8 +- 16 files changed, 495 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java new file mode 100644 index 0000000..bbf3574 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; + +/** + * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the + * provided {@link WindowFn}. + * + * @param <T> Type of elements being windowed + * @param <W> Window type + */ +@SystemDoFnInternal +public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T> + implements RequiresWindowAccess { + private WindowFn<? super T, W> fn; + + public AssignWindowsDoFn(WindowFn<? super T, W> fn) { + this.fn = + checkNotNull( + fn, + "%s provided to %s cannot be null", + WindowFn.class.getSimpleName(), + AssignWindowsDoFn.class.getSimpleName()); + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(final ProcessContext c) throws Exception { + Collection<W> windows = + ((WindowFn<T, W>) fn).assignWindows( + ((WindowFn<T, W>) fn).new AssignContext() { + @Override + public T element() { + return c.element(); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public BoundedWindow window() { + return Iterables.getOnlyElement(c.windowingInternals().windows()); + } + }); + + c.windowingInternals() + .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java new file mode 100644 index 0000000..66ad736 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AggregatorRetriever; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. + * + * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link + * DoFnInvoker}) rather than via {@link OldDoFn}. + */ +@Deprecated +public class DoFnAdapters { + /** Should not be instantiated. */ + private DoFnAdapters() {} + + /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) { + DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); + if (signature.processElement().observesWindow()) { + return new WindowDoFnAdapter<>(fn); + } else { + return new SimpleDoFnAdapter<>(fn); + } + } + + /** + * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link + * OldDoFn}. + */ + private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { + private final DoFn<InputT, OutputT> fn; + private transient DoFnInvoker<InputT, OutputT> invoker; + + SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { + super(AggregatorRetriever.getDelegatingAggregators(fn)); + this.fn = fn; + this.invoker = DoFnInvokers.invokerFor(fn); + } + + @Override + public void setup() throws Exception { + this.invoker.invokeSetup(); + } + + @Override + public void startBundle(Context c) throws Exception { + fn.prepareForProcessing(); + invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void finishBundle(Context c) throws Exception { + invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void teardown() throws Exception { + this.invoker.invokeTeardown(); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); + invoker.invokeProcessElement(adapter); + } + + @Override + public Duration getAllowedTimestampSkew() { + return fn.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(fn); + } + + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.invoker = DoFnInvokers.invokerFor(fn); + } + } + + /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ + private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> + implements OldDoFn.RequiresWindowAccess { + + WindowDoFnAdapter(DoFn<InputT, OutputT> fn) { + super(fn); + } + } + + /** + * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link + * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is + * unavailable. + */ + private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private OldDoFn<InputT, OutputT>.Context context; + + private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { + fn.super(); + this.context = context; + super.setupDelegateAggregators(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + context.output(tag, output); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + context.outputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, + CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public BoundedWindow window() { + // The OldDoFn doesn't allow us to ask for these outside processElement, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get the window in processElement; elsewhere there is no defined window."); + } + + @Override + public Context context(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Can only get a ProcessContext in processElement"); + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Timers are not supported for OldDoFn"); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } + + /** + * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. + */ + private static class ProcessContextAdapter<InputT, OutputT> + extends DoFn<InputT, OutputT>.ProcessContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private OldDoFn<InputT, OutputT>.ProcessContext context; + + private ProcessContextAdapter( + DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { + fn.super(); + this.context = context; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return context.sideInput(view); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + context.output(tag, output); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + context.outputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public InputT element() { + return context.element(); + } + + @Override + public Instant timestamp() { + return context.timestamp(); + } + + @Override + public PaneInfo pane() { + return context.pane(); + } + + @Override + public void updateWatermark(Instant watermark) { + throw new UnsupportedOperationException("Only splittable DoFn's can use updateWatermark()"); + } + + @Override + public BoundedWindow window() { + return context.window(); + } + + @Override + public Context context(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 06db6e1..b09ee08 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -105,7 +105,7 @@ public class DoFnRunners { /** * Returns an implementation of {@link DoFnRunner} that handles late data dropping. * - * <p>It drops elements from expired windows before they reach the underlying {@link DoFn}. + * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. */ public static <K, InputT, OutputT, W extends BoundedWindow> DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 5bd7e2d..5508b2e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -17,34 +17,23 @@ */ package org.apache.beam.runners.core; -import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER; -import static org.apache.beam.runners.core.GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER; - import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.construction.Triggers; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; /** - * The default batch {@link GroupAlsoByWindowsAggregators} implementation, if no specialized "fast - * path" implementation is applicable. + * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path" + * implementation is applicable. */ @SystemDoFnInternal public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { - protected final Aggregator<Long, Long> droppedDueToClosedWindow = - createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); - protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> { private final WindowingStrategy<?, W> strategy; private final StateInternalsFactory<K> stateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index e6be93a..bf48df1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; /** - * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the * {@link ReduceFnRunner}. */ @SystemDoFnInternal @@ -46,10 +46,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn< protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); private final WindowingStrategy<Object, W> windowingStrategy; private final StateInternalsFactory<K> stateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index e146bfc..0cf6e2d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** - * A general {@link GroupAlsoByWindowsAggregators}. This delegates all of the logic to the + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the * {@link ReduceFnRunner}. */ @SystemDoFnInternal @@ -61,10 +61,9 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< protected final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); private final WindowingStrategy<Object, W> windowingStrategy; private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; private transient StateInternalsFactory<K> stateInternalsFactory; @@ -145,6 +144,10 @@ public class GroupAlsoByWindowViaWindowSetNewDoFn< reduceFnRunner.persist(); } + public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() { + throw new RuntimeException("Not implement!"); + } + public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() { return droppedDueToLateness; } http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java deleted file mode 100644 index 7c4f252..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsAggregators.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; - -/** - * Standard aggregator names related to {@link GroupAlsoByWindow}. - */ -public abstract class GroupAlsoByWindowsAggregators { - public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; - public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; -} http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java new file mode 100644 index 0000000..7e96136 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +/** + * {@link OldDoFn} that merges windows and groups elements in those windows, optionally + * combining values. + * + * @param <K> key type + * @param <InputT> input value element type + * @param <OutputT> output value element type + * @param <W> window type + */ +@SystemDoFnInternal +public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> + extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { + public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; + public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; + + protected final Aggregator<Long, Long> droppedDueToClosedWindow = + createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator<Long, Long> droppedDueToLateness = + createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); +} http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index cdc7ce7..4d41527 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowTracing; @@ -33,7 +32,7 @@ import org.joda.time.Instant; /** * A customized {@link DoFnRunner} that handles late data dropping for - * a {@link KeyedWorkItem} input {@link DoFn}. + * a {@link KeyedWorkItem} input {@link OldDoFn}. * * <p>It expands windows before checking data lateness. * http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java index e725cd2..cb8d494 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java @@ -43,10 +43,10 @@ public class GroupAlsoByWindowViaOutputBufferDoFnTest { @Override public <W extends BoundedWindow> - GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W> forStrategy( + GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy( WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory) { - return new GroupAlsoByWindowViaOutputBufferDoFn<>( + return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>( windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, InputT, W>buffering(inputCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index a5031b8..d0a8923 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -57,7 +57,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; /** - * Properties of {@link GroupAlsoByWindowsAggregators}. + * Properties of {@link GroupAlsoByWindowsDoFn}. * * <p>Some properties may not hold of some implementations, due to restrictions on the context in * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not @@ -66,13 +66,12 @@ import org.joda.time.Instant; public class GroupAlsoByWindowsProperties { /** - * A factory of {@link GroupAlsoByWindowsAggregators} so that the various properties can provide - * the appropriate windowing strategy under test. + * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the + * appropriate windowing strategy under test. */ public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> { - <W extends BoundedWindow> - GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> forStrategy( - WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); + <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> forStrategy( + WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); } /** @@ -312,7 +311,7 @@ public class GroupAlsoByWindowsProperties { } /** - * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per + * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per * session window correctly according to the provided {@link CombineFn}. */ public static void combinesElementsPerSession( @@ -499,7 +498,7 @@ public class GroupAlsoByWindowsProperties { } /** - * Tests that the given {@link GroupAlsoByWindowsAggregators} implementation combines elements per + * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per * session window correctly according to the provided {@link CombineFn}. */ public static void combinesElementsPerSessionWithEndOfWindowTimestamp( @@ -598,7 +597,7 @@ public class GroupAlsoByWindowsProperties { private static <K, InputT, OutputT, W extends BoundedWindow> List<WindowedValue<KV<K, OutputT>>> processElement( - GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn, + GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, KV<K, Iterable<WindowedValue<InputT>>> element) throws Exception { TestProcessContext<K, InputT, OutputT, W> c = new TestProcessContext<>(fn, element); @@ -622,18 +621,18 @@ public class GroupAlsoByWindowsProperties { } /** - * A {@link GroupAlsoByWindowViaOutputBufferDoFn.ProcessContext} providing just enough context for - * a {@link GroupAlsoByWindowsAggregators} - namely, information about the element and output via - * {@link WindowingInternals}, but no side inputs/outputs and no normal output. + * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link + * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link + * WindowingInternals}, but no side inputs/outputs and no normal output. */ private static class TestProcessContext<K, InputT, OutputT, W extends BoundedWindow> - extends GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W>.ProcessContext { + extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W>.ProcessContext { private final PipelineOptions options = PipelineOptionsFactory.create(); private final KV<K, Iterable<WindowedValue<InputT>>> element; private final List<WindowedValue<KV<K, OutputT>>> output = new ArrayList<>(); private TestProcessContext( - GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W> fn, + GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> fn, KV<K, Iterable<WindowedValue<InputT>>> element) { fn.super(); this.element = element; http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 923b2c3..914550e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -113,8 +113,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { private boolean autoAdvanceOutputWatermark = true; private final InMemoryLongSumAggregator droppedDueToClosedWindow = - new InMemoryLongSumAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); /** * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index ce29709..ce7b12a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; -import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; +import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; @@ -146,10 +146,10 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); reduceFn = SystemReduceFn.buffering(valueCoder); droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext, - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); } http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1b40613..029c28a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -23,8 +23,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; +import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -77,7 +76,7 @@ import scala.reflect.ClassTag; import scala.runtime.AbstractFunction1; /** - * An implementation of {@link GroupAlsoByWindow} + * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn} * logic for grouping by windows and controlling trigger firings and pane accumulation. * * <p>This implementation is a composite of Spark transformations revolving around state management @@ -209,9 +208,9 @@ public class SparkGroupAlsoByWindowViaWindowSet { // use in memory Aggregators since Spark Accumulators are not resilient // in stateful operators, once done with this partition. final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator( - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER); + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); AbstractIterator< Tuple2</*K*/ ByteArray, Tuple2<StateAndTimers, /*WV<KV<K, Itr<I>>>*/ List<byte[]>>>> http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java index 088b981..18a3dd8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java @@ -29,8 +29,7 @@ import org.joda.time.Instant; /** - * An implementation of {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} for the Spark - * runner. + * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner. */ public class SparkAssignWindowFn<T, W extends BoundedWindow> implements Function<WindowedValue<T>, WindowedValue<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/d357aea7/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index 85adca9..ccc0fa3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -21,8 +21,8 @@ package org.apache.beam.runners.spark.translation; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; -import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; +import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -48,7 +48,7 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; /** - * An implementation of {@link GroupAlsoByWindow} + * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn} * for the Spark runner. */ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow> @@ -75,7 +75,7 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde droppedDueToClosedWindow = runtimeContext.createAggregator( accumulator, - GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); }
