Move SideInputReader to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6542eafc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6542eafc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6542eafc Branch: refs/heads/master Commit: 6542eafcf3a84105d2716cd0200d40ccdf764472 Parents: 1d4b1ed Author: Kenneth Knowles <[email protected]> Authored: Wed May 3 19:46:59 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu May 4 16:06:55 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/apex/ApexRunnerRegistrar.java | 1 - .../beam/runners/apex/ApexRunnerResult.java | 2 - .../beam/runners/apex/ApexYarnLauncher.java | 2 - .../translation/ReadUnboundedTranslator.java | 1 - .../apex/translation/TransformTranslator.java | 1 - .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 4 +- .../ApexReadUnboundedInputOperator.java | 2 - .../utils/CoderAdapterStreamCodec.java | 2 - .../utils/ValueAndCoderKryoSerializable.java | 2 - .../apex/translation/utils/ValuesSource.java | 2 - .../beam/runners/apex/ApexYarnLauncherTest.java | 2 - .../apex/examples/UnboundedTextSource.java | 2 - .../runners/apex/examples/WordCountTest.java | 2 - .../translation/ApexGroupByKeyOperatorTest.java | 2 - .../translation/utils/CollectionSource.java | 2 - .../apache/beam/runners/core/DoFnRunners.java | 2 - .../runners/core/GlobalCombineFnRunner.java | 1 - .../runners/core/GlobalCombineFnRunners.java | 41 ++++++++++--- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 1 - .../beam/runners/core/NullSideInputReader.java | 61 ++++++++++++++++++++ ...eBoundedSplittableProcessElementInvoker.java | 1 - .../beam/runners/core/ProcessFnRunner.java | 1 - .../core/ReadyCheckingSideInputReader.java | 34 +++++++++++ .../runners/core/ReduceFnContextFactory.java | 1 - .../beam/runners/core/ReduceFnRunner.java | 1 - .../beam/runners/core/SideInputHandler.java | 1 - .../beam/runners/core/SideInputReader.java | 47 +++++++++++++++ .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../core/SimplePushbackSideInputDoFnRunner.java | 1 - .../core/UnsupportedSideInputReader.java | 1 - .../core/WindowingInternalsAdapters.java | 1 - ...ndedSplittableProcessElementInvokerTest.java | 1 - .../beam/runners/core/ReduceFnRunnerTest.java | 1 - .../beam/runners/core/ReduceFnTester.java | 2 - .../beam/runners/core/SimpleDoFnRunnerTest.java | 1 - .../SimplePushbackSideInputDoFnRunnerTest.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 1 - .../runners/core/StatefulDoFnRunnerTest.java | 1 - .../beam/runners/direct/EvaluationContext.java | 4 +- .../beam/runners/direct/ParDoEvaluator.java | 2 +- .../beam/runners/direct/SideInputContainer.java | 4 +- ...littableProcessElementsEvaluatorFactory.java | 2 +- .../runners/direct/EvaluationContextTest.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../runners/direct/SideInputContainerTest.java | 4 +- .../StatefulParDoEvaluatorFactoryTest.java | 2 +- .../functions/AbstractFlinkCombineRunner.java | 2 +- .../functions/FlinkSideInputReader.java | 2 +- .../functions/HashingFlinkCombineRunner.java | 2 +- .../functions/SortingFlinkCombineRunner.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 4 +- .../beam/runners/spark/SparkRunnerDebugger.java | 1 - .../spark/aggregators/NamedAggregators.java | 2 - .../beam/runners/spark/coders/CoderHelpers.java | 1 - .../spark/coders/StatelessJavaSerializer.java | 1 - .../runners/spark/io/SparkUnboundedSource.java | 1 - .../runners/spark/metrics/AggregatorMetric.java | 1 - .../spark/metrics/SparkBeamMetricSource.java | 1 - .../spark/stateful/StateSpecFunctions.java | 1 - .../translation/SparkAbstractCombineFn.java | 2 +- .../spark/translation/TranslationUtils.java | 1 - .../spark/util/GlobalWatermarkHolder.java | 1 - .../spark/util/SparkSideInputReader.java | 2 +- .../beam/sdk/util/CombineContextFactory.java | 25 -------- .../beam/sdk/util/NullSideInputReader.java | 61 -------------------- .../sdk/util/ReadyCheckingSideInputReader.java | 34 ----------- .../apache/beam/sdk/util/SideInputReader.java | 47 --------------- .../harness/control/ProcessBundleHandler.java | 2 +- 70 files changed, 199 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java index aa6ef45..8cde692 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerRegistrar.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.apex; import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; - import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunner; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 41fdb75..cc24ddd 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -18,9 +18,7 @@ package org.apache.beam.runners.apex; import com.datatorrent.api.DAG; - import java.io.IOException; - import org.apache.apex.api.Launcher.AppHandle; import org.apache.apex.api.Launcher.ShutdownMode; import org.apache.beam.sdk.Pipeline; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java index b84144c..18d8e94 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java @@ -25,7 +25,6 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; - import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; @@ -56,7 +55,6 @@ import java.util.Properties; import java.util.Set; import java.util.jar.JarFile; import java.util.jar.Manifest; - import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.apex.api.Launcher; import org.apache.apex.api.Launcher.AppHandle; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java index b3034ac..168cbf5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ReadUnboundedTranslator.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.apex.translation; import com.datatorrent.api.InputOperator; - import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java index eb81052..49ff49b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TransformTranslator.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation; import java.io.Serializable; - import org.apache.beam.sdk.transforms.PTransform; /** http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 9d4e9a2..85836ad 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternalsFactory; @@ -49,7 +50,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 99ad964..8c516b1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -43,8 +43,10 @@ import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StatefulDoFnRunner; @@ -56,8 +58,6 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java index ac28c2a..1549560 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java @@ -26,9 +26,7 @@ import com.datatorrent.common.util.BaseOperator; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Throwables; - import java.io.IOException; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java index d08e76f..f6ce1d0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java @@ -19,12 +19,10 @@ package org.apache.beam.runners.apex.translation.utils; import com.datatorrent.api.StreamCodec; import com.datatorrent.netlet.util.Slice; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java index 395ad1f..2a72f17 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValueAndCoderKryoSerializable.java @@ -22,9 +22,7 @@ import com.esotericsoftware.kryo.KryoSerializable; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; - import java.io.IOException; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java index 62c92a0..41f027f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java @@ -24,9 +24,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; - import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.IterableCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java index 6ffb091..68ec2ea 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java @@ -26,7 +26,6 @@ import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; - import java.io.File; import java.net.URI; import java.nio.file.FileSystem; @@ -37,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.jar.JarFile; - import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.apex.api.Launcher; import org.apache.apex.api.Launcher.AppHandle; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java index abe97f6..c590a2e 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java @@ -23,9 +23,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; - import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.UnboundedSource; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java index 83af61b..e76096e 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java @@ -18,11 +18,9 @@ package org.apache.beam.runners.apex.examples; import com.google.common.collect.Sets; - import java.io.File; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java index e31d830..206b430 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ApexGroupByKeyOperatorTest.java @@ -20,9 +20,7 @@ package org.apache.beam.runners.apex.translation; import com.datatorrent.api.Sink; import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.collect.Lists; - import java.util.List; - import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.TestApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexGroupByKeyOperator; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java index 92812b4..288aade 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java @@ -25,9 +25,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; - import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index f3dd9a3..ee3aefa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -26,8 +26,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java index 5325ba6..4c312b4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunner.java @@ -22,7 +22,6 @@ import java.util.Collection; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; /** * An interface that runs a {@link GlobalCombineFn} with unified APIs. http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java index d45b503..d98bac8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GlobalCombineFnRunners.java @@ -22,10 +22,10 @@ import java.util.Collection; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.CombineContextFactory; -import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.values.PCollectionView; /** * Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different @@ -47,6 +47,33 @@ public class GlobalCombineFnRunners { } /** + * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader}, and + * the main input window. + */ + private static CombineWithContext.Context createFromComponents( + final PipelineOptions options, + final SideInputReader sideInputReader, + final BoundedWindow mainInputWindow) { + return new CombineWithContext.Context() { + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + if (!sideInputReader.contains(view)) { + throw new IllegalArgumentException("calling sideInput() with unknown view"); + } + + BoundedWindow sideInputWindow = + view.getWindowMappingFn().getSideInputWindow(mainInputWindow); + return sideInputReader.get(view, sideInputWindow); + } + }; + } + + /** * An implementation of {@link GlobalCombineFnRunner} with {@link CombineFn}. * * <p>It forwards functions calls to the {@link CombineFn}. @@ -136,7 +163,7 @@ public class GlobalCombineFnRunners { SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { return combineFnWithContext.createAccumulator( - CombineContextFactory.createFromComponents( + createFromComponents( options, sideInputReader, Iterables.getOnlyElement(windows))); } @@ -150,7 +177,7 @@ public class GlobalCombineFnRunners { return combineFnWithContext.addInput( accumulator, input, - CombineContextFactory.createFromComponents( + createFromComponents( options, sideInputReader, Iterables.getOnlyElement(windows))); } @@ -162,7 +189,7 @@ public class GlobalCombineFnRunners { Collection<? extends BoundedWindow> windows) { return combineFnWithContext.mergeAccumulators( accumulators, - CombineContextFactory.createFromComponents( + createFromComponents( options, sideInputReader, Iterables.getOnlyElement(windows))); } @@ -174,7 +201,7 @@ public class GlobalCombineFnRunners { Collection<? extends BoundedWindow> windows) { return combineFnWithContext.extractOutput( accumulator, - CombineContextFactory.createFromComponents( + createFromComponents( options, sideInputReader, Iterables.getOnlyElement(windows))); } @@ -186,7 +213,7 @@ public class GlobalCombineFnRunners { Collection<? extends BoundedWindow> windows) { return combineFnWithContext.compact( accumulator, - CombineContextFactory.createFromComponents( + createFromComponents( options, sideInputReader, Iterables.getOnlyElement(windows))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index 4c1fe95..5b82d1f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -24,7 +24,6 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java new file mode 100644 index 0000000..786ed41 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NullSideInputReader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.Sets; +import java.util.Collections; +import java.util.Set; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link SideInputReader} representing a well-defined set of views, but not storing + * any values for them. Used to check if a side input is present when the data itself + * comes from elsewhere. + */ +public class NullSideInputReader implements SideInputReader { + + private Set<PCollectionView<?>> views; + + public static NullSideInputReader empty() { + return new NullSideInputReader(Collections.<PCollectionView<?>>emptySet()); + } + + public static NullSideInputReader of(Iterable<? extends PCollectionView<?>> views) { + return new NullSideInputReader(views); + } + + private NullSideInputReader(Iterable<? extends PCollectionView<?>> views) { + this.views = Sets.newHashSet(views); + } + + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + throw new IllegalArgumentException("cannot call NullSideInputReader.get()"); + } + + @Override + public boolean isEmpty() { + return views.isEmpty(); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return views.contains(view); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 16bdfa3..2db6531 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java index 7cbf0d2..61f413f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java new file mode 100644 index 0000000..8d1f0e2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReadyCheckingSideInputReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ +public interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView<?> view, BoundedWindow window); +} + http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 63e8e6d..bd327f3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index a04006b..d2ed835 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index af75010..539b9f0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java new file mode 100644 index 0000000..7d1b829 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * The interface to objects that provide side inputs. Particular implementations + * may read a side input directly or use appropriate sorts of caching, etc. + */ +public interface SideInputReader { + /** + * Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}. + * + * <p>It is valid for a side input to be {@code null}. It is <i>not</i> valid for this to + * return {@code null} for any other reason. + */ + @Nullable + <T> T get(PCollectionView<T> view, BoundedWindow window); + + /** + * Returns true if the given {@link PCollectionView} is valid for this reader. + */ + <T> boolean contains(PCollectionView<T> view); + + /** + * Returns true if there are no side inputs in this reader. + */ + boolean isEmpty(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 4057790..aab34a5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index cf9dff2..2a0b688 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java index 36a04d9..3f77f7d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.Set; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java index 4230f8c..c1c7179 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java index 1b36bf9..4a58445 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 541e238..a2f6acc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 381a493..3dd98e0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -70,7 +70,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 6fb0dcb..7de8f3b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -62,9 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index fae9117..abefd1c 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index dabc4f0..79bf0b2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index b93ff3a..be4cf08 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -56,7 +56,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 2ca8b05..76351e4 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 75ff1c7..7b64611 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; @@ -42,8 +44,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index e5a6680..4cfd16f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; @@ -33,7 +34,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 1e773c9..380dc65 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -35,11 +35,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.PCollectionViewWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 2c22caf..44f2e85 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.core.SplittableParDo.ProcessFn; import org.apache.beam.runners.core.StateInternals; @@ -39,7 +40,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 123e9f2..077bb68 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index ef8add9..88fd5d2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; @@ -44,7 +45,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IdentitySideInputWindowFn; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java index d7c4440..d4ca9fd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java @@ -32,6 +32,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; @@ -44,8 +46,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index f6b652d..f1ba57c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -58,7 +59,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java index 4872a06..2ae7833 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java @@ -21,10 +21,10 @@ import com.google.common.collect.ImmutableList; import java.util.Collection; import org.apache.beam.runners.core.GlobalCombineFnRunner; import org.apache.beam.runners.core.GlobalCombineFnRunners; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java index fa95477..f275290 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -23,8 +23,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java index 942bf42..feb8b39 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java @@ -27,12 +27,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java index fb4c678..026a35c 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -23,13 +23,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 1844d6d..f35ba7a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -34,8 +34,10 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; +import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; @@ -64,8 +66,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.NullSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java index 8d47e1a..11c52c7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; - import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java index cf6c9ad..c836ca5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java @@ -22,14 +22,12 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; - import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Map; import java.util.TreeMap; - import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java index 9c46ecf..85e3572 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/CoderHelpers.java @@ -32,7 +32,6 @@ import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.sdk.coders.Coder; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; - import scala.Tuple2; /** http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java index 01b3b93..0cf4951 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java @@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index 6b34590..0388f6c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -58,7 +58,6 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.dstream.DStream; import org.apache.spark.streaming.scheduler.StreamInputInfo; import org.joda.time.Instant; - import scala.Tuple2; import scala.runtime.BoxedUnit; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java index 271cc6b..450bc95 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/AggregatorMetric.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.metrics; import com.codahale.metrics.Metric; - import org.apache.beam.runners.spark.aggregators.NamedAggregators; /** http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java index 9cab66d..5c6fc24 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricSource.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.spark.metrics; import com.codahale.metrics.MetricRegistry; - import org.apache.spark.metrics.source.Source; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 30ee639..d8d52c4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -47,7 +47,6 @@ import org.apache.spark.streaming.StateSpec; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Option; import scala.Tuple2; import scala.runtime.AbstractFunction3; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java index df633b0..315f7fb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAbstractCombineFn.java @@ -29,13 +29,13 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.runners.spark.util.SparkSideInputReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index f462e60..993062c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -50,7 +50,6 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; - import scala.Tuple2; /** A set of utilities to help translating Beam transformations into Spark transformations. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 212f974..8b384d8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index e876e12..6c91088 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -24,8 +24,8 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/6542eafc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index f93cb0b..b72fd82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; /** @@ -64,28 +63,4 @@ public class CombineContextFactory { }; } - /** - * Returns a {@code Combine.Context} from {@code PipelineOptions}, {@code SideInputReader}, - * and the main input window. - */ - public static Context createFromComponents(final PipelineOptions options, - final SideInputReader sideInputReader, final BoundedWindow mainInputWindow) { - return new Context() { - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - - BoundedWindow sideInputWindow = - view.getWindowMappingFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); - } - }; - } }
