Move LateDataUtils and UnsupportedSideInputReader to runners-core. Add missing licenses.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/123f4820 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/123f4820 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/123f4820 Branch: refs/heads/master Commit: 123f4820b68dd13690be59a7b8b946e81031b6a7 Parents: 4ca5680 Author: Sela <[email protected]> Authored: Mon Feb 27 20:24:54 2017 +0200 Committer: Sela <[email protected]> Committed: Wed Mar 1 00:18:07 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/core/LateDataUtils.java | 88 +++++++++++++++++++ .../core/UnsupportedSideInputReader.java | 52 +++++++++++ .../runners/spark/TestSparkPipelineOptions.java | 17 ++++ .../SparkGroupAlsoByWindowViaWindowSet.java | 4 +- .../beam/runners/spark/util/LateDataUtils.java | 92 -------------------- .../spark/util/UnsupportedSideInputReader.java | 52 ----------- .../apache/beam/runners/spark/PipelineRule.java | 17 ++++ 7 files changed, 176 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java new file mode 100644 index 0000000..17bd360 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Iterables; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; + + +/** + * Utils to handle late data. + */ +public class LateDataUtils { + + /** + * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input elements. + */ + public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows( + final K key, + Iterable<WindowedValue<V>> elements, + final TimerInternals timerInternals, + final WindowingStrategy<?, ?> windowingStrategy, + final Aggregator<Long, Long> droppedDueToLateness) { + return FluentIterable.from(elements) + .transformAndConcat( + // Explode windows to filter out expired ones + new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() { + @Override + public Iterable<WindowedValue<V>> apply(@Nullable WindowedValue<V> input) { + if (input == null) { + return null; + } + return input.explodeWindows(); + } + }) + .filter( + new Predicate<WindowedValue<V>>() { + @Override + public boolean apply(@Nullable WindowedValue<V> input) { + if (input == null) { + // drop null elements. + return false; + } + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + boolean expired = + window + .maxTimestamp() + .plus(windowingStrategy.getAllowedLateness()) + .isBefore(timerInternals.currentInputWatermarkTime()); + if (expired) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "GroupAlsoByWindow: Dropping element at {} for key: {}; " + + "window: {} since it is too far behind inputWatermark: {}", + input.getTimestamp(), + key, + window, + timerInternals.currentInputWatermarkTime()); + } + // Keep the element if the window is not expired. + return !expired; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java new file mode 100644 index 0000000..4230f8c --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; + + +/** + * An implementation of a {@link SideInputReader} that actually does not support side-inputs. + */ +public class UnsupportedSideInputReader implements SideInputReader { + private final String transformName; + + public UnsupportedSideInputReader(String transformName) { + this.transformName = transformName; + } + + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException( + String.format("%s does not support side inputs.", transformName)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java index 2cb58d8..d50b652 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark; import org.apache.beam.sdk.options.Default; http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 24b8508..2f1713a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -24,10 +24,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.UnsupportedSideInputReader; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.spark.SparkPipelineOptions; @@ -37,8 +39,6 @@ import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.runners.spark.translation.WindowingHelpers; import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; -import org.apache.beam.runners.spark.util.LateDataUtils; -import org.apache.beam.runners.spark.util.UnsupportedSideInputReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java deleted file mode 100644 index 18689bd..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.util; - -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.Iterables; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowTracing; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; - - -/** - * Utils to handle late data. - */ -public class LateDataUtils { - - /** - * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input - * elements. - * Taken from Thomas Groh's implementation in the DirectRunner's - * GroupAlsoByWindowEvaluatorFactory. - */ - public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows( - final K key, - Iterable<WindowedValue<V>> elements, - final TimerInternals timerInternals, - final WindowingStrategy<?, ?> windowingStrategy, - final Aggregator<Long, Long> droppedDueToLateness) { - return FluentIterable.from(elements) - .transformAndConcat( - // Explode windows to filter out expired ones - new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() { - @Override - public Iterable<WindowedValue<V>> apply(@Nullable WindowedValue<V> input) { - if (input == null) { - return null; - } - return input.explodeWindows(); - } - }) - .filter( - new Predicate<WindowedValue<V>>() { - @Override - public boolean apply(@Nullable WindowedValue<V> input) { - if (input == null) { - // drop null elements. - return false; - } - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); - boolean expired = - window - .maxTimestamp() - .plus(windowingStrategy.getAllowedLateness()) - .isBefore(timerInternals.currentInputWatermarkTime()); - if (expired) { - // The element is too late for this window. - droppedDueToLateness.addValue(1L); - WindowTracing.debug( - "GroupAlsoByWindow: Dropping element at {} for key: {}; " - + "window: {} since it is too far behind inputWatermark: {}", - input.getTimestamp(), - key, - window, - timerInternals.currentInputWatermarkTime()); - } - // Keep the element if the window is not expired. - return !expired; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java deleted file mode 100644 index 96d889d..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.util; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.values.PCollectionView; - - -/** - * An implementation of a {@link SideInputReader} that actually does not support side-inputs. - */ -public class UnsupportedSideInputReader implements SideInputReader { - private final String transformName; - - public UnsupportedSideInputReader(String transformName) { - this.transformName = transformName; - } - - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow window) { - throw new UnsupportedOperationException( - String.format("%s does not support side inputs.", transformName)); - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - throw new UnsupportedOperationException( - String.format("%s does not support side inputs.", transformName)); - } - - @Override - public boolean isEmpty() { - throw new UnsupportedOperationException( - String.format("%s does not support side inputs.", transformName)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java index bb42510..77519cd 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark; import org.apache.beam.sdk.Pipeline;
