BEAM-261 Enable checkstyle and cleanup.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9454b3bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9454b3bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9454b3bd Branch: refs/heads/apex-runner Commit: 9454b3bdc6f6ff69363dcd339cfb069c2c2f8cc9 Parents: 1ec7cd9 Author: Thomas Weise <t...@apache.org> Authored: Sun Oct 16 17:36:01 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Mon Oct 17 09:22:49 2016 -0700 ---------------------------------------------------------------------- runners/apex/pom.xml | 2 - .../runners/apex/ApexPipelineTranslator.java | 39 +-- .../apache/beam/runners/apex/ApexRunner.java | 314 +++---------------- .../beam/runners/apex/ApexRunnerResult.java | 23 +- .../beam/runners/apex/TestApexRunner.java | 9 +- .../apache/beam/runners/apex/package-info.java | 22 ++ .../translators/CreateValuesTranslator.java | 12 +- .../FlattenPCollectionTranslator.java | 13 +- .../apex/translators/GroupByKeyTranslator.java | 4 +- .../translators/ParDoBoundMultiTranslator.java | 47 +-- .../apex/translators/ParDoBoundTranslator.java | 5 +- .../translators/ReadUnboundedTranslator.java | 4 +- .../apex/translators/TransformTranslator.java | 8 +- .../apex/translators/TranslationContext.java | 40 +-- .../functions/ApexFlattenOperator.java | 42 ++- .../functions/ApexGroupByKeyOperator.java | 155 +++++---- .../functions/ApexParDoOperator.java | 140 ++++----- .../translators/functions/package-info.java | 22 ++ .../io/ApexReadUnboundedInputOperator.java | 57 ++-- .../apex/translators/io/ValuesSource.java | 23 +- .../apex/translators/io/package-info.java | 22 ++ .../runners/apex/translators/package-info.java | 22 ++ .../apex/translators/utils/ApexStreamTuple.java | 85 +++-- .../utils/CoderAdapterStreamCodec.java | 24 +- .../apex/translators/utils/NoOpStepContext.java | 7 +- .../utils/SerializablePipelineOptions.java | 21 +- .../utils/ValueAndCoderKryoSerializable.java | 26 +- .../apex/translators/utils/package-info.java | 22 ++ .../beam/runners/apex/examples/IntTest.java | 133 -------- .../apex/examples/StreamingWordCountTest.java | 15 +- .../apex/examples/UnboundedTextSource.java | 16 +- .../runners/apex/examples/package-info.java | 22 ++ .../FlattenPCollectionTranslatorTest.java | 32 +- .../translators/GroupByKeyTranslatorTest.java | 45 ++- .../translators/ParDoBoundTranslatorTest.java | 20 +- .../translators/ReadUnboundTranslatorTest.java | 45 ++- .../translators/utils/CollectionSource.java | 13 +- .../translators/utils/PipelineOptionsTest.java | 28 +- .../apex/src/test/resources/log4j.properties | 8 +- 39 files changed, 662 insertions(+), 925 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 929feb4..8b62410 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -148,12 +148,10 @@ <build> <plugins> - <!-- Checkstyle errors for now <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> </plugin> - --> <!-- Integration Tests --> <plugin> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java index a16f551..a6857ee 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java @@ -18,6 +18,11 @@ package org.apache.beam.runners.apex; +import com.datatorrent.api.DAG; + +import java.util.HashMap; +import java.util.Map; + import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; import org.apache.beam.runners.apex.translators.CreateValuesTranslator; import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator; @@ -43,18 +48,13 @@ import org.apache.beam.sdk.values.PValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - /** * {@link ApexPipelineTranslator} translates {@link Pipeline} objects * into Apex logical plan {@link DAG}. */ @SuppressWarnings({"rawtypes", "unchecked"}) public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { - - private static final Logger LOG = LoggerFactory.getLogger( - ApexPipelineTranslator.class); + private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class); /** * A map from {@link PTransform} subclass to the corresponding @@ -75,8 +75,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { registerTransformTranslator(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); - registerTransformTranslator(CreateApexPCollectionView.class, new CreateApexPCollectionViewTranslator()); - registerTransformTranslator(CreatePCollectionView.class, new CreatePCollectionViewTranslator()); + registerTransformTranslator(CreateApexPCollectionView.class, + new CreateApexPCollectionViewTranslator()); + registerTransformTranslator(CreatePCollectionView.class, + new CreatePCollectionViewTranslator()); } public ApexPipelineTranslator(TranslationContext translationContext) { @@ -134,7 +136,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { * Returns the {@link TransformTranslator} to use for instances of the * specified PTransform class, or null if none registered. */ - private <TransformT extends PTransform<?,?>> + private <TransformT extends PTransform<?, ?>> TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) { return transformTranslators.get(transformClass); } @@ -145,7 +147,8 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { @Override public void translate(Read.Bounded<T> transform, TranslationContext context) { // TODO: adapter is visibleForTesting - BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(transform.getSource()); + BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>( + transform.getSource()); ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( unboundedSource, context.getPipelineOptions()); context.addOperator(operator, operator.output); @@ -153,26 +156,26 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { } - private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> - { + private static class CreateApexPCollectionViewTranslator<ElemT, ViewT> + implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> { private static final long serialVersionUID = 1L; @Override - public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) - { + public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, + TranslationContext context) { PCollectionView<ViewT> view = transform.getView(); context.addView(view); LOG.debug("view {}", view.getName()); } } - private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> - { + private static class CreatePCollectionViewTranslator<ElemT, ViewT> + implements TransformTranslator<CreatePCollectionView<ElemT, ViewT>> { private static final long serialVersionUID = 1L; @Override - public void translate(CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) - { + public void translate(CreatePCollectionView<ElemT, ViewT> transform, + TranslationContext context) { PCollectionView<ViewT> view = transform.getView(); context.addView(view); LOG.debug("view {}", view.getName()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 667f1c8..f3c44bb 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -19,17 +19,18 @@ package org.apache.beam.runners.apex; import static com.google.common.base.Preconditions.checkArgument; -import java.util.ArrayList; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.google.common.base.Throwables; + import java.util.Arrays; import java.util.List; -import java.util.Map; import org.apache.beam.runners.apex.translators.TranslationContext; +import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Combine; @@ -39,31 +40,22 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.Context.DAGContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; -import com.google.common.base.Throwables; - /** * A {@link PipelineRunner} that translates the * pipeline to an Apex DAG and executes it on an Apex cluster. - * <p> - * Currently execution is always in embedded mode, + * + * <p>Currently execution is always in embedded mode, * launch on Hadoop cluster will be added in subsequent iteration. */ @SuppressWarnings({"rawtypes", "unchecked"}) @@ -99,37 +91,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); -// TODO: replace this with a mapping -//// - } else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) { - PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this, - (Combine.GloballyAsSingletonView)transform); + PTransform<InputT, OutputT> customTransform = (PTransform) + new StreamingCombineGloballyAsSingletonView<InputT, OutputT>( + this, (Combine.GloballyAsSingletonView) transform); return Pipeline.applyTransform(input, customTransform); } else if (View.AsSingleton.class.equals(transform.getClass())) { - // note this assumes presence of above Combine.GloballyAsSingletonView mapping - PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this, - (View.AsSingleton)transform); + // assumes presence of above Combine.GloballyAsSingletonView mapping + PTransform<InputT, OutputT> customTransform = (PTransform) + new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) transform); return Pipeline.applyTransform(input, customTransform); -/* - } else if (View.AsIterable.class.equals(transform.getClass())) { - PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this, - (View.AsIterable)transform); - return Pipeline.applyTransform(input, customTransform); - } else if (View.AsList.class.equals(transform.getClass())) { - PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsList<InputT>(this, - (View.AsList)transform); - return Pipeline.applyTransform(input, customTransform); - } else if (View.AsMap.class.equals(transform.getClass())) { - PTransform<InputT, OutputT> customTransform = new StreamingViewAsMap(this, - (View.AsMap)transform); - return Pipeline.applyTransform(input, customTransform); - } else if (View.AsMultimap.class.equals(transform.getClass())) { - PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this, - (View.AsMultimap)transform); - return Pipeline.applyTransform(input, customTransform); -*/ -//// } else { return super.apply(transform, input); } @@ -142,17 +113,16 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { ApexPipelineTranslator translator = new ApexPipelineTranslator(translationContext); translator.translate(pipeline); - StreamingApplication apexApp = new StreamingApplication() - { + StreamingApplication apexApp = new StreamingApplication() { @Override - public void populateDAG(DAG dag, Configuration conf) - { + public void populateDAG(DAG dag, Configuration conf) { dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName()); translationContext.populateDAG(dag); } }; - checkArgument(options.isEmbeddedExecution(), "only embedded execution is supported at this time"); + checkArgument(options.isEmbeddedExecution(), + "only embedded execution is supported at this time"); LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); try { @@ -178,7 +148,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } return new ApexRunnerResult(lma.getDAG(), lc); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); } } @@ -231,13 +202,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { // Adapted from FlinkRunner for View support /** - * Records that the {@link PTransform} requires a deterministic key coder. - */ - private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { - //throw new UnsupportedOperationException(); - } - - /** * Creates a primitive {@link PCollectionView}. * * <p>For internal use only by runner implementors. @@ -247,6 +211,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { */ public static class CreateApexPCollectionView<ElemT, ViewT> extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private static final long serialVersionUID = 1L; private PCollectionView<ViewT> view; private CreateApexPCollectionView(PCollectionView<ViewT> view) { @@ -276,52 +241,50 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> - { + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + private static final long serialVersionUID = 1L; Combine.GloballyAsSingletonView<InputT, OutputT> transform; /** * Builds an instance of this class from the overridden transform. */ public StreamingCombineGloballyAsSingletonView(ApexRunner runner, - Combine.GloballyAsSingletonView<InputT, OutputT> transform) - { + Combine.GloballyAsSingletonView<InputT, OutputT> transform) { this.transform = transform; } @Override - public PCollectionView<OutputT> apply(PCollection<InputT> input) - { + public PCollectionView<OutputT> apply(PCollection<InputT> input) { PCollection<OutputT> combined = input - .apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout())); + .apply(Combine.globally(transform.getCombineFn()) + .withoutDefaults().withFanout(transform.getFanout())); PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(), combined.getWindowingStrategy(), transform.getInsertDefault(), - transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder()); + transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, + combined.getCoder()); return combined.apply(ParDo.of(new WrapAsList<OutputT>())) .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view)); } @Override - protected String getKindString() - { + protected String getKindString() { return "StreamingCombineGloballyAsSingletonView"; } } - private static class StreamingViewAsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>> - { + private static class StreamingViewAsSingleton<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { private static final long serialVersionUID = 1L; + private View.AsSingleton<T> transform; - public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform) - { + public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform) { this.transform = transform; } @Override - public PCollectionView<T> apply(PCollection<T> input) - { + public PCollectionView<T> apply(PCollection<T> input) { Combine.Globally<T, T> combine = Combine .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); if (!transform.hasDefaultValue()) { @@ -331,33 +294,28 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } @Override - protected String getKindString() - { + protected String getKindString() { return "StreamingViewAsSingleton"; } - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> - { + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { private boolean hasDefaultValue; private T defaultValue; - SingletonCombine(boolean hasDefaultValue, T defaultValue) - { + SingletonCombine(boolean hasDefaultValue, T defaultValue) { this.hasDefaultValue = hasDefaultValue; this.defaultValue = defaultValue; } @Override - public T apply(T left, T right) - { + public T apply(T left, T right) { throw new IllegalArgumentException("PCollection with more than one element " + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + "combine the PCollection into a single value"); } @Override - public T identity() - { + public T identity() { if (hasDefaultValue) { return defaultValue; } else { @@ -368,194 +326,4 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } } - private static class StreamingViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - - private final ApexRunner runner; - - public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the - * Flink runner in streaming mode. - */ - private static class StreamingViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - - private final ApexRunner runner; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (Coder.NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); - } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; - } - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the - * Flink runner in streaming mode. - */ - private static class StreamingViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {} - - @Override - public PCollectionView<List<T>> apply(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateApexPCollectionView.<T, List<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the - * Flink runner in streaming mode. - */ - private static class StreamingViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() - public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform) { } - - @Override - public PCollectionView<Iterable<T>> apply(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, - * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - @Override - public List<T> createAccumulator() { - return new ArrayList<T>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 6817684..d5613fe 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -17,20 +17,19 @@ */ package org.apache.beam.runners.apex; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.metrics.MetricResults; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; import java.io.IOException; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; - /** * Result of executing a {@link Pipeline} with Apex in embedded mode. */ @@ -56,28 +55,24 @@ public class ApexRunnerResult implements PipelineResult { } @Override - public State cancel() throws IOException - { + public State cancel() throws IOException { ctrl.shutdown(); state = State.CANCELLED; return state; } @Override - public State waitUntilFinish(Duration duration) throws IOException, InterruptedException - { + public State waitUntilFinish(Duration duration) throws IOException, InterruptedException { throw new UnsupportedOperationException(); } @Override - public State waitUntilFinish() throws IOException, InterruptedException - { + public State waitUntilFinish() throws IOException, InterruptedException { throw new UnsupportedOperationException(); } @Override - public MetricResults metrics() - { + public MetricResults metrics() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java index 45c143e..2e048f0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -25,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; - +/** + * Apex {@link PipelineRunner} for testing. + */ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { private ApexRunner delegate; @@ -38,13 +40,14 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult> { } public static TestApexRunner fromOptions(PipelineOptions options) { - ApexPipelineOptions apexOptions = PipelineOptionsValidator.validate(ApexPipelineOptions.class, options); + ApexPipelineOptions apexOptions = PipelineOptionsValidator + .validate(ApexPipelineOptions.class, options); return new TestApexRunner(apexOptions); } @Override public <OutputT extends POutput, InputT extends PInput> - OutputT apply(PTransform<InputT,OutputT> transform, InputT input) { + OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { return delegate.apply(transform, input); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java new file mode 100644 index 0000000..4d2f417 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of the Beam runner for Apache Apex. + */ +package org.apache.beam.runners.apex; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java index 7a29057..539f311 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/CreateValuesTranslator.java @@ -25,12 +25,10 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PBegin; -import com.google.common.base.Throwables; - /** * Wraps elements from Create.Values into an {@link UnboundedSource}. - * mainly used for test + * mainly used for testing */ public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> { private static final long serialVersionUID = 1451000241832745629L; @@ -39,12 +37,12 @@ public class CreateValuesTranslator<T> implements TransformTranslator<Create.Val public void translate(Create.Values<T> transform, TranslationContext context) { try { UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(transform.getElements(), - transform.getDefaultOutputCoder((PBegin)context.getInput())); - ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(unboundedSource, - context.getPipelineOptions()); + transform.getDefaultOutputCoder((PBegin) context.getInput())); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); context.addOperator(operator, operator.output); } catch (CannotProvideCoderException e) { - Throwables.propagate(e); + throw new RuntimeException(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java index 6737767..a39aacb 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.apex.translators; +import com.google.common.collect.Lists; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,8 +34,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import com.google.common.collect.Lists; - /** * {@link Flatten.FlattenPCollectionList} translation to Apex operator. */ @@ -72,7 +72,8 @@ public class FlattenPCollectionTranslator<T> implements * @param finalCollection * @param context */ - static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) { + static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollection<?>, + Integer> unionTags, PCollection<T> finalCollection, TranslationContext context) { List<PCollection<T>> remainingCollections = Lists.newArrayList(); PCollection<T> firstCollection = null; while (!collections.isEmpty()) { @@ -93,7 +94,8 @@ public class FlattenPCollectionTranslator<T> implements } if (collections.size() > 2) { - PCollection<T> intermediateCollection = intermediateCollection(collection, collection.getCoder()); + PCollection<T> intermediateCollection = intermediateCollection(collection, + collection.getCoder()); context.addOperator(operator, operator.out, intermediateCollection); remainingCollections.add(intermediateCollection); } else { @@ -118,7 +120,8 @@ public class FlattenPCollectionTranslator<T> implements } static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) { - PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), + input.getWindowingStrategy(), input.isBounded()); output.setCoder(outputCoder); return output; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java index 43c82a9..d3e7d2d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/GroupByKeyTranslator.java @@ -31,9 +31,9 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe @Override public void translate(GroupByKey<K, V> transform, TranslationContext context) { - PCollection<KV<K, V>> input = context.getInput(); - ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), input); + ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(), + input); context.addOperator(group, group.output); context.addStream(input, group.input); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java index a229a81..13f07c1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java @@ -20,6 +20,10 @@ package org.apache.beam.runners.apex.translators; import static com.google.common.base.Preconditions.checkArgument; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.OutputPort; +import com.google.common.collect.Maps; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -27,6 +31,7 @@ import java.util.Map; import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -35,16 +40,16 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.OutputPort; -import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * {@link ParDo.BoundMulti} is translated to Apex operator that wraps the {@link DoFn} + * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. */ -public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> { +public class ParDoBoundMultiTranslator<InputT, OutputT> + implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslator.class); @Override public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { @@ -56,7 +61,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(), + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( + context.getPipelineOptions(), doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(), context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder); @@ -73,36 +79,37 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran } } - static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, TranslationContext context) { + static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs, + TranslationContext context) { Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1}; if (sideInputs.size() > sideInputPorts.length) { - // String msg = String.format("Too many side inputs in %s (currently only supporting %s).", - // transform.toString(), sideInputPorts.length); - // throw new UnsupportedOperationException(msg); PCollection<?> unionCollection = unionSideInputs(sideInputs, context); context.addStream(unionCollection, sideInputPorts[0]); } else { - for (int i=0; i<sideInputs.size(); i++) { - // the number of input ports for side inputs are fixed and each port can only take one input. - // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced. + // the number of ports for side inputs is fixed and each port can only take one input. + for (int i = 0; i < sideInputs.size(); i++) { context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]); } } } - private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, TranslationContext context) { + private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs, + TranslationContext context) { checkArgument(sideInputs.size() > 1, "requires multiple side inputs"); // flatten and assign union tag List<PCollection<Object>> sourceCollections = new ArrayList<>(); Map<PCollection<?>, Integer> unionTags = new HashMap<>(); PCollection<Object> firstSideInput = context.getViewInput(sideInputs.get(0)); - for (int i=0; i < sideInputs.size(); i++) { + for (int i = 0; i < sideInputs.size(); i++) { PCollectionView<?> sideInput = sideInputs.get(i); PCollection<?> sideInputCollection = context.getViewInput(sideInput); - if (!sideInputCollection.getWindowingStrategy().equals(firstSideInput.getWindowingStrategy())) { + if (!sideInputCollection.getWindowingStrategy().equals( + firstSideInput.getWindowingStrategy())) { // TODO: check how to handle this in stream codec //String msg = "Multiple side inputs with different window strategies."; //throw new UnsupportedOperationException(msg); + LOG.warn("Side inputs union with different windowing strategies {} {}", + firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy()); } if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) { String msg = "Multiple side inputs with different coders."; @@ -112,8 +119,10 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TransformTran unionTags.put(sideInputCollection, i); } - PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(firstSideInput, firstSideInput.getCoder()); - FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, context); + PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection( + firstSideInput, firstSideInput.getCoder()); + FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection, + context); return resultCollection; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java index 7749a06..bd7115e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; /** - * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn} + * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */ public class ParDoBoundTranslator<InputT, OutputT> implements TransformTranslator<ParDo.Bound<InputT, OutputT>> { @@ -49,7 +49,8 @@ public class ParDoBoundTranslator<InputT, OutputT> implements WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(), + ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>( + context.getPipelineOptions(), doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/, output.getWindowingStrategy(), sideInputs, wvInputCoder); context.addOperator(operator, operator.output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java index b53e4dd..3097276 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ReadUnboundedTranslator.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.apex.translators; +import com.datatorrent.api.InputOperator; + import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import com.datatorrent.api.InputOperator; - /** * {@link Read.Unbounded} is translated to Apex {@link InputOperator} * that wraps {@link UnboundedSource}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java index 1a99885..dfd2045 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TransformTranslator.java @@ -19,13 +19,13 @@ package org.apache.beam.runners.apex.translators; -import org.apache.beam.sdk.transforms.PTransform; - import java.io.Serializable; +import org.apache.beam.sdk.transforms.PTransform; + /** - * translates {@link PTransform} to Apex functions. + * Translates {@link PTransform} to Apex functions. */ -public interface TransformTranslator<T extends PTransform<?,?>> extends Serializable { +public interface TransformTranslator<T extends PTransform<?, ?>> extends Serializable { void translate(T transform, TranslationContext context); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java index bd44a20..ddacc29 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java @@ -19,6 +19,17 @@ package org.apache.beam.runners.apex.translators; import static com.google.common.base.Preconditions.checkArgument; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.Operator.OutputPort; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.CoderAdapterStreamCodec; @@ -34,17 +45,6 @@ import org.apache.beam.sdk.values.POutput; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.Operator.OutputPort; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Maintains context data for {@link TransformTranslator}s. */ @@ -64,7 +64,7 @@ public class TranslationContext { public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) { PInput input = this.viewInputs.get(view); checkArgument(input != null, "unknown view " + view.getName()); - return (InputT)input; + return (InputT) input; } public TranslationContext(ApexPipelineOptions pipelineOptions) { @@ -109,13 +109,14 @@ public class TranslationContext { addOperator(operator, portEntry.getValue(), portEntry.getKey()); first = false; } else { - this.streams.put(portEntry.getKey(), (Pair)new ImmutablePair<>(portEntry.getValue(), new ArrayList<>())); + this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(), + new ArrayList<>())); } } } /** - * Add intermediate operator for the current transformation. + * Add the operator with its output port for the given result {link PCollection}. * @param operator * @param port * @param output @@ -124,9 +125,11 @@ public class TranslationContext { // Apex DAG requires a unique operator name // use the transform's name and make it unique String name = getCurrentTransform().getFullName(); - for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++); + for (int i = 1; this.operators.containsKey(name); i++) { + name = getCurrentTransform().getFullName() + i; + } this.operators.put(name, operator); - this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>())); + this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>())); } public void addStream(PInput input, InputPort inputPort) { @@ -140,11 +143,12 @@ public class TranslationContext { dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); } int streamIndex = 0; - for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this.streams.entrySet()) { + for (Map.Entry<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streamEntry : this. + streams.entrySet()) { List<InputPort<?>> sinksList = streamEntry.getValue().getRight(); InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); if (sinks.length > 0) { - dag.addStream("stream"+streamIndex++, streamEntry.getValue().getLeft(), sinks); + dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks); for (InputPort port : sinks) { PCollection pc = streamEntry.getKey(); Coder coder = pc.getCoder(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java index 202f2d3..dd8fcd1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java @@ -17,23 +17,22 @@ */ package org.apache.beam.runners.apex.translators.functions; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple; -import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; - /** * Apex operator for Beam {@link Flatten.FlattenPCollectionList}. */ -public class ApexFlattenOperator<InputT> extends BaseOperator -{ +public class ApexFlattenOperator<InputT> extends BaseOperator { + private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); private boolean traceTuples = true; @@ -47,16 +46,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator /** * Data input port 1. */ - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() - { + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { /** * Emits to port "out" */ @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) - { + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { if (tuple instanceof WatermarkTuple) { - WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple; + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; if (wmTuple.getTimestamp() > inputWM1) { inputWM1 = wmTuple.getTimestamp(); if (inputWM1 <= inputWM2) { @@ -75,7 +73,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator } if (data1Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { - ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data1Tag); + ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data1Tag); } out.emit(tuple); } @@ -84,16 +82,15 @@ public class ApexFlattenOperator<InputT> extends BaseOperator /** * Data input port 2. */ - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() - { + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { /** * Emits to port "out" */ @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) - { + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { if (tuple instanceof WatermarkTuple) { - WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple; + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>) tuple; if (wmTuple.getTimestamp() > inputWM2) { inputWM2 = wmTuple.getTimestamp(); if (inputWM2 <= inputWM1) { @@ -112,7 +109,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator } if (data2Tag > 0 && tuple instanceof ApexStreamTuple.DataTuple) { - ((ApexStreamTuple.DataTuple<?>)tuple).setUnionTag(data2Tag); + ((ApexStreamTuple.DataTuple<?>) tuple).setUnionTag(data2Tag); } out.emit(tuple); } @@ -121,6 +118,7 @@ public class ApexFlattenOperator<InputT> extends BaseOperator /** * Output port. */ - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = + new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java index 5970f36..845618d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java @@ -17,6 +17,20 @@ */ package org.apache.beam.runners.apex.translators.functions; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; @@ -61,19 +75,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; - /** * Apex operator for Beam {@link GroupByKey}. * This operator expects the input stream already partitioned by K, @@ -82,8 +83,7 @@ import com.google.common.collect.Multimap; * @param <K> * @param <V> */ -public class ApexGroupByKeyOperator<K, V> implements Operator -{ +public class ApexGroupByKeyOperator<K, V> implements Operator { private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class); private boolean traceTuples = true; @@ -98,7 +98,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator private final SerializablePipelineOptions serializedOptions; @Bind(JavaSerializer.class) // TODO: InMemoryStateInternals not serializable -transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>(); + private transient Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>(); private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); private transient ProcessContext context; @@ -106,19 +106,19 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new private transient ApexTimerInternals timerInternals = new ApexTimerInternals(); private Instant inputWatermark = new Instant(0); - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() - { + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>> input = + new DefaultInputPort<ApexStreamTuple<WindowedValue<KV<K, V>>>>() { @Override - public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) - { + public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) { try { if (t instanceof ApexStreamTuple.WatermarkTuple) { - ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t; + ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>) t; processWatermark(mark); if (traceTuples) { LOG.debug("\nemitting watermark {}\n", mark.getTimestamp()); } - output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp())); + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of( + mark.getTimestamp())); return; } if (traceTuples) { @@ -126,53 +126,49 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new } processElement(t.getValue()); } catch (Exception e) { - Throwables.propagate(e); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); } } }; - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> output = new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<KV<K, Iterable<V>>>>> + output = new DefaultOutputPort<>(); @SuppressWarnings("unchecked") - public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input) - { - Preconditions.checkNotNull(pipelineOptions); + public ApexGroupByKeyOperator(ApexPipelineOptions pipelineOptions, PCollection<KV<K, V>> input) { + checkNotNull(pipelineOptions); this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); - this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy(); - this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder(); - this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder(); + this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>) input.getWindowingStrategy(); + this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder(); + this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder(); } @SuppressWarnings("unused") // for Kryo - private ApexGroupByKeyOperator() - { + private ApexGroupByKeyOperator() { this.serializedOptions = null; } @Override - public void beginWindow(long l) - { + public void beginWindow(long l) { } @Override - public void endWindow() - { + public void endWindow() { } @Override - public void setup(OperatorContext context) - { + public void setup(OperatorContext context) { this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this); StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory(); - this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory, - SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); + this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, + stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); this.context = new ProcessContext(fn, this.timerInternals); } @Override - public void teardown() - { + public void teardown() { } /** @@ -181,14 +177,16 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new * We keep these timers in a Set, so that they are deduplicated, as the same * timer can be registered multiple times. */ - private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess( + long currentWatermark) { // we keep the timers to return in a different list and launch them later // because we cannot prevent a trigger from registering another trigger, // which would lead to concurrent modification exception. Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create(); - Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = + activeTimers.entrySet().iterator(); while (it.hasNext()) { Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); @@ -223,18 +221,15 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new fn.processElement(context); } - private StateInternals<K> getStateInternalsForKey(K key) - { + private StateInternals<K> getStateInternalsForKey(K key) { final ByteBuffer keyBytes; try { keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes); if (stateInternals == null) { - //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn(); stateInternals = InMemoryStateInternals.forKey(key); perKeyStateInternals.put(keyBytes, stateInternals); } @@ -246,7 +241,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new try { keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); if (timersForKey == null) { @@ -261,7 +256,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new try { keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); } catch (CoderException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); if (timersForKey != null) { @@ -276,7 +271,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { this.inputWatermark = new Instant(mark.getTimestamp()); - Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess( + mark.getTimestamp()); if (!timers.isEmpty()) { for (ByteBuffer keyBytes : timers.keySet()) { K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); @@ -287,7 +283,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new } } - private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, KeyedWorkItem<K, V>>.ProcessContext { + private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?, + KeyedWorkItem<K, V>>.ProcessContext { private final ApexTimerInternals timerInternals; private StateInternals<K> stateInternals; @@ -296,7 +293,7 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function, ApexTimerInternals timerInternals) { function.super(); - this.timerInternals = Preconditions.checkNotNull(timerInternals); + this.timerInternals = checkNotNull(timerInternals); } public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) { @@ -311,7 +308,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new @Override public Instant timestamp() { - throw new UnsupportedOperationException("timestamp() is not available when processing KeyedWorkItems."); + throw new UnsupportedOperationException( + "timestamp() is not available when processing KeyedWorkItems."); } @Override @@ -333,7 +331,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new @Override public PaneInfo pane() { - throw new UnsupportedOperationException("pane() is not available when processing KeyedWorkItems."); + throw new UnsupportedOperationException( + "pane() is not available when processing KeyedWorkItems."); } @Override @@ -352,11 +351,13 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new } @Override - public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane) { if (traceTuples) { LOG.debug("\nemitting {} timestamp {}\n", output, timestamp); } - ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane))); + ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of( + WindowedValue.of(output, timestamp, windows, pane))); } @Override @@ -375,7 +376,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new } @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, + Coder<T> elemCoder) throws IOException { throw new RuntimeException("writePCollectionViewData() not available in Streaming mode."); } @@ -404,7 +406,8 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { throw new UnsupportedOperationException(); } } @@ -416,52 +419,44 @@ transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new public class ApexTimerInternals implements TimerInternals { @Override - public void setTimer(TimerData timerKey) - { + public void setTimer(TimerData timerKey) { registerActiveTimer(context.element().key(), timerKey); } @Override - public void deleteTimer(TimerData timerKey) - { + public void deleteTimer(TimerData timerKey) { unregisterActiveTimer(context.element().key(), timerKey); } @Override - public Instant currentProcessingTime() - { + public Instant currentProcessingTime() { return Instant.now(); } @Override - public Instant currentSynchronizedProcessingTime() - { + public Instant currentSynchronizedProcessingTime() { // TODO Auto-generated method stub return null; } @Override - public Instant currentInputWatermarkTime() - { + public Instant currentInputWatermarkTime() { return inputWatermark; } @Override - public Instant currentOutputWatermarkTime() - { + public Instant currentOutputWatermarkTime() { // TODO Auto-generated method stub return null; } - } - private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable - { + private class GroupByKeyStateInternalsFactory implements StateInternalsFactory<K>, Serializable { + private static final long serialVersionUID = 1L; + @Override - public StateInternals<K> stateInternalsForKey(K key) - { + public StateInternals<K> stateInternalsForKey(K key) { return getStateInternalsForKey(key); } } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9454b3bd/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java index 96be11d..9e8f3dc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java @@ -17,6 +17,18 @@ */ package org.apache.beam.runners.apex.translators.functions; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,9 +41,9 @@ import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption import org.apache.beam.runners.apex.translators.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; @@ -52,18 +64,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; -import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.esotericsoftware.kryo.serializers.JavaSerializer; - /** * Apex operator for Beam {@link DoFn}. */ @@ -85,8 +85,8 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements private final List<PCollectionView<?>> sideInputs; // TODO: not Kryo serializable, integrate codec -//@Bind(JavaSerializer.class) -private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null); + private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals + .forKey(null); private final ValueAndCoderKryoSerializable<List<WindowedValue<InputT>>> pushedBack; private LongMin pushedBackWatermark = new LongMin(); private long currentInputWatermark = Long.MIN_VALUE; @@ -94,7 +94,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner; private transient SideInputHandler sideInputHandler; - private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = Maps.newHashMapWithExpectedSize(5); + private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>> sideOutputPortMapping = + Maps.newHashMapWithExpectedSize(5); public ApexParDoOperator( ApexPipelineOptions pipelineOptions, @@ -104,8 +105,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn WindowingStrategy<?, ?> windowingStrategy, List<PCollectionView<?>> sideInputs, Coder<WindowedValue<InputT>> inputCoder - ) - { + ) { this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); this.doFn = doFn; this.mainOutputTag = mainOutputTag; @@ -120,7 +120,8 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn } Coder<List<WindowedValue<InputT>>> coder = ListCoder.of(inputCoder); - this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), coder); + this.pushedBack = new ValueAndCoderKryoSerializable<>(new ArrayList<WindowedValue<InputT>>(), + coder); } @@ -135,13 +136,12 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn this.pushedBack = null; } - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() - { + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = + new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { @Override - public void process(ApexStreamTuple<WindowedValue<InputT>> t) - { + public void process(ApexStreamTuple<WindowedValue<InputT>> t) { if (t instanceof ApexStreamTuple.WatermarkTuple) { - processWatermark((ApexStreamTuple.WatermarkTuple<?>)t); + processWatermark((ApexStreamTuple.WatermarkTuple<?>) t); } else { if (traceTuples) { LOG.debug("\ninput {}\n", t.getValue()); @@ -155,12 +155,11 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn } }; - @InputPortFieldAnnotation(optional=true) - public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() - { + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = + new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() { @Override - public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) - { + public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) { if (t instanceof ApexStreamTuple.WatermarkTuple) { // ignore side input watermarks return; @@ -168,7 +167,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn int sideInputIndex = 0; if (t instanceof ApexStreamTuple.DataTuple) { - sideInputIndex = ((ApexStreamTuple.DataTuple<?>)t).getUnionTag(); + sideInputIndex = ((ApexStreamTuple.DataTuple<?>) t).getUnionTag(); } if (traceTuples) { @@ -196,25 +195,30 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn } }; - @OutputPortFieldAnnotation(optional=true) + @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<ApexStreamTuple<?>> output = new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = new DefaultOutputPort<>(); - @OutputPortFieldAnnotation(optional=true) - public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = new DefaultOutputPort<>(); - - public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, sideOutput3, sideOutput4, sideOutput5}; + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput1 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput2 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput3 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput4 = + new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<ApexStreamTuple<?>> sideOutput5 = + new DefaultOutputPort<>(); + + public final transient DefaultOutputPort<?>[] sideOutputPorts = {sideOutput1, sideOutput2, + sideOutput3, sideOutput4, sideOutput5}; @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) - { + public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) { DefaultOutputPort<ApexStreamTuple<?>> sideOutputPort = sideOutputPortMapping.get(tag); if (sideOutputPort != null) { sideOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple)); @@ -229,19 +233,19 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { try { pushbackDoFnRunner.startBundle(); - Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner.processElementInReadyWindows(elem); + Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner + .processElementInReadyWindows(elem); pushbackDoFnRunner.finishBundle(); return pushedBack; } catch (UserCodeException ue) { if (ue.getCause() instanceof AssertionError) { - ApexRunner.assertionError = (AssertionError)ue.getCause(); + ApexRunner.assertionError = (AssertionError) ue.getCause(); } throw ue; } } - private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) - { + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) { this.currentInputWatermark = mark.getTimestamp(); if (sideInputs.isEmpty()) { @@ -264,8 +268,7 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn } @Override - public void setup(OperatorContext context) - { + public void setup(OperatorContext context) { this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); if (!sideInputs.isEmpty()) { @@ -273,9 +276,10 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn sideInputReader = sideInputHandler; } - for (int i=0; i < sideOutputTags.size(); i++) { + for (int i = 0; i < sideOutputTags.size(); i++) { @SuppressWarnings("unchecked") - DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>)sideOutputPorts[i]; + DefaultOutputPort<ApexStreamTuple<?>> port = (DefaultOutputPort<ApexStreamTuple<?>>) + sideOutputPorts[i]; sideOutputPortMapping.put(sideOutputTags.get(i), port); } @@ -297,25 +301,18 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn try { doFn.setup(); } catch (Exception e) { - Throwables.propagate(e); + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); } } @Override - public void beginWindow(long windowId) - { - /* - Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn); - if (!aggregators.isEmpty()) { - System.out.println("\n" + Thread.currentThread().getName() + "\n" +AggregatorRetriever.getAggregators(doFn) + "\n"); - } - */ + public void beginWindow(long windowId) { } @Override - public void endWindow() - { + public void endWindow() { } /** @@ -334,32 +331,27 @@ private transient StateInternals<Void> sideInputStateInternals = InMemoryStateIn return new NoOpAggregator<InputT, OutputT>(); } - private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, java.io.Serializable - { + private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, + java.io.Serializable { private static final long serialVersionUID = 1L; @Override - public void addValue(InputT value) - { + public void addValue(InputT value) { } @Override - public String getName() - { + public String getName() { // TODO Auto-generated method stub return null; } @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() - { + public CombineFn<InputT, ?, OutputT> getCombineFn() { // TODO Auto-generated method stub return null; } }; - - } private static class LongMin {