Repository: incubator-beam Updated Branches: refs/heads/master 632576b5b -> 3ad767750
Remove unused WindowingInternals.writePCollectionViewData Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/803bbe2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/803bbe2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/803bbe2a Branch: refs/heads/master Commit: 803bbe2a3026424f509e13809a8eecb79990e5fe Parents: 07544ef Author: Kenneth Knowles <[email protected]> Authored: Wed Nov 23 11:23:07 2016 -0800 Committer: Sela <[email protected]> Committed: Sat Nov 26 12:47:14 2016 +0200 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 10 ---------- .../beam/runners/core/SimpleDoFnRunner.java | 18 ------------------ .../beam/runners/core/SimpleOldDoFnRunner.java | 16 ---------------- .../functions/FlinkProcessContextBase.java | 8 -------- .../spark/translation/SparkProcessContext.java | 9 --------- .../apache/beam/sdk/transforms/DoFnTester.java | 10 ---------- .../apache/beam/sdk/util/WindowingInternals.java | 10 ---------- 7 files changed, 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index eca4308..3b0e4f2 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -30,8 +30,6 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; - -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; @@ -40,7 +38,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; @@ -392,13 +389,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) - throws IOException { - throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); - } - - @Override public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { throw new RuntimeException("sideInput() is not available in Streaming mode."); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 841e412..f611c0a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; @@ -595,21 +592,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) - throws IOException { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, - data, - IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), - windowCoder); - } - - @Override public StateInternals<?> stateInternals() { return context.stepContext.stateInternals(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index cbda791..73286ad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -21,14 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; @@ -496,19 +493,6 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), windowCoder); - } - - @Override public StateInternals<?> stateInternals() { return context.stepContext.stateInternals(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 2169785..42607dd 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -20,14 +20,12 @@ package org.apache.beam.runners.flink.translation.functions; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; -import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; @@ -153,12 +151,6 @@ abstract class FlinkProcessContextBase<InputT, OutputT> } @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override public <ViewT> ViewT sideInput( PCollectionView<ViewT> view, BoundedWindow sideInputWindow) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 6a6cbd4..bb0ec2f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -23,14 +23,12 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.util.BroadcastHelper; import org.apache.beam.runners.spark.util.SparkSideInputReader; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; @@ -282,13 +280,6 @@ public abstract class SparkProcessContext<InputT, OutputT, ValueT> } @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - throw new UnsupportedOperationException( - "WindowingInternals#writePCollectionViewData() is not yet supported."); - } - - @Override public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { throw new UnsupportedOperationException( "WindowingInternals#sideInput() is not yet supported."); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index bbf0315..daa8a06 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -25,7 +25,6 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,7 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -701,14 +699,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) - throws IOException { - throw new UnsupportedOperationException( - "WritePCollectionViewData is not supported in in the context of DoFnTester"); - } - - @Override public <T> T sideInput( PCollectionView<T> view, BoundedWindow sideInputWindow) { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/803bbe2a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index 5e90864..a921725 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -17,9 +17,7 @@ */ package org.apache.beam.sdk.util; -import java.io.IOException; import java.util.Collection; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -78,14 +76,6 @@ public interface WindowingInternals<InputT, OutputT> { PaneInfo pane(); /** - * Write the given {@link PCollectionView} data to a location accessible by other workers. - */ - <T> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException; - - /** * Return the value of the side input for a particular side input window. */ <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
