http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java index caad7f8..e7542cb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.PCollectionView; /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */ public class CreateDataflowView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) { return new CreateDataflowView<>(view); } @@ -36,10 +36,8 @@ public class CreateDataflowView<ElemT, ViewT> } @Override - public PCollection<ElemT> expand(PCollection<ElemT> input) { - return PCollection.<ElemT>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + return view; } public PCollectionView<ViewT> getView() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index f1783de..8eaf61b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,8 +56,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues; @@ -397,9 +395,7 @@ public class DataflowPipelineTranslator { @Override public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) { - return (InputT) - Iterables.getOnlyElement( - TransformInputs.nonAdditionalInputs(getCurrentTransform(transform))); + return (InputT) Iterables.getOnlyElement(getInputs(transform).values()); } @Override @@ -444,14 +440,6 @@ public class DataflowPipelineTranslator { public void visitValue(PValue value, TransformHierarchy.Node producer) { LOG.debug("Checking translation of {}", value); // Primitive transforms are the only ones assigned step names. - if (producer.getTransform() instanceof CreateDataflowView) { - // CreateDataflowView produces a dummy output (as it must be a primitive transform) but - // in the Dataflow Job graph produces only the view and not the output PCollection. - asOutputReference( - ((CreateDataflowView) producer.getTransform()).getView(), - producer.toAppliedPTransform(getPipeline())); - return; - } asOutputReference(value, producer.toAppliedPTransform(getPipeline())); } @@ -477,7 +465,6 @@ public class DataflowPipelineTranslator { StepTranslator stepContext = new StepTranslator(this, step); stepContext.addInput(PropertyNames.USER_NAME, getFullName(transform)); stepContext.addDisplayData(step, stepName, transform); - LOG.info("Adding {} as step {}", getCurrentTransform(transform).getFullName(), stepName); return stepContext; } @@ -690,7 +677,7 @@ public class DataflowPipelineTranslator { context.addStep(transform, "CollectionToSingleton"); PCollection<ElemT> input = context.getInput(transform); stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); - stepContext.addCollectionToSingletonOutput(input, transform.getView()); + stepContext.addCollectionToSingletonOutput(input, context.getOutput(transform)); } }); @@ -793,7 +780,6 @@ public class DataflowPipelineTranslator { context.getPipelineOptions().as(StreamingOptions.class).isStreaming(); boolean disallowCombinerLifting = !windowingStrategy.getWindowFn().isNonMerging() - || !windowingStrategy.getWindowFn().assignsToOneWindow() || (isStreaming && !transform.fewKeys()) // TODO: Allow combiner lifting on the non-default trigger, as appropriate. || !(windowingStrategy.getTrigger() instanceof DefaultTrigger); @@ -888,45 +874,6 @@ public class DataflowPipelineTranslator { // IO Translation. registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); - - /////////////////////////////////////////////////////////////////////////// - // Splittable DoFn translation. - - registerTransformTranslator( - SplittableParDo.ProcessKeyedElements.class, - new TransformTranslator<SplittableParDo.ProcessKeyedElements>() { - @Override - public void translate( - SplittableParDo.ProcessKeyedElements transform, TranslationContext context) { - translateTyped(transform, context); - } - - private <InputT, OutputT, RestrictionT> void translateTyped( - SplittableParDo.ProcessKeyedElements<InputT, OutputT, RestrictionT> transform, - TranslationContext context) { - StepTranslationContext stepContext = - context.addStep(transform, "SplittableProcessKeyed"); - - translateInputs( - stepContext, context.getInput(transform), transform.getSideInputs(), context); - BiMap<Long, TupleTag<?>> outputMap = - translateOutputs(context.getOutputs(transform), stepContext); - stepContext.addInput( - PropertyNames.SERIALIZED_FN, - byteArrayToJsonString( - serializeToByteArray( - DoFnInfo.forFn( - transform.getFn(), - transform.getInputWindowingStrategy(), - transform.getSideInputs(), - transform.getElementCoder(), - outputMap.inverse().get(transform.getMainOutputTag()), - outputMap)))); - stepContext.addInput( - PropertyNames.RESTRICTION_CODER, - CloudObjects.asCloudObject(transform.getRestrictionCoder())); - } - }); } private static void translateInputs( @@ -973,11 +920,6 @@ public class DataflowPipelineTranslator { fn)); } - if (signature.usesState() || signature.usesTimers()) { - DataflowRunner.verifyStateSupported(fn); - DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy); - } - stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName()); stepContext.addInput( PropertyNames.SERIALIZED_FN, http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 8935759..3e7c8ce 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -67,12 +67,11 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.UnconsumedReads; -import org.apache.beam.runners.core.construction.WriteFilesTranslation; +import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.MonitoringUtil; @@ -93,7 +92,6 @@ import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; @@ -107,8 +105,6 @@ import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.Create; @@ -121,8 +117,6 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; @@ -135,12 +129,10 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.DateTimeUtils; @@ -333,24 +325,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { new StreamingFnApiCreateOverrideFactory())); } overridesBuilder - // Support Splittable DoFn for now only in streaming mode. - // The order of the following overrides is important because they are applied in order. - - // By default Dataflow runner replaces single-output ParDo with a ParDoSingle override. - // However, we want a different expansion for single-output splittable ParDo. - .add( - PTransformOverride.of( - PTransformMatchers.splittableParDoSingle(), - new ReflectiveOneToOneOverrideFactory( - SplittableParDoOverrides.ParDoSingleViaMulti.class, this))) - .add( - PTransformOverride.of( - PTransformMatchers.splittableParDoMulti(), - new SplittableParDoOverrides.SplittableParDoOverrideFactory())) - .add( - PTransformOverride.of( - PTransformMatchers.writeWithRunnerDeterminedSharding(), - new StreamingShardedWriteFactory(options))) .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and // must precede it @@ -376,29 +350,34 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformOverride.of( PTransformMatchers.stateOrTimerParDoSingle(), BatchStatefulParDoOverrides.singleOutputOverrideFactory())) + + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new BatchCombineGloballyAsSingletonViewFactory(this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class), + PTransformMatchers.classEqualTo(View.AsMap.class), new ReflectiveOneToOneOverrideFactory( BatchViewOverrides.BatchViewAsMap.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class), + PTransformMatchers.classEqualTo(View.AsMultimap.class), new ReflectiveOneToOneOverrideFactory( BatchViewOverrides.BatchViewAsMultimap.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), + PTransformMatchers.classEqualTo(View.AsSingleton.class), new ReflectiveOneToOneOverrideFactory( BatchViewOverrides.BatchViewAsSingleton.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class), + PTransformMatchers.classEqualTo(View.AsList.class), new ReflectiveOneToOneOverrideFactory( BatchViewOverrides.BatchViewAsList.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), + PTransformMatchers.classEqualTo(View.AsIterable.class), new ReflectiveOneToOneOverrideFactory( BatchViewOverrides.BatchViewAsIterable.class, this))); } @@ -1455,61 +1434,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @VisibleForTesting - static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT> - implements PTransformOverrideFactory< - PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> { - // We pick 10 as a a default, as it works well with the default number of workers started - // by Dataflow. - static final int DEFAULT_NUM_SHARDS = 10; - DataflowPipelineWorkerPoolOptions options; - - StreamingShardedWriteFactory(PipelineOptions options) { - this.options = options.as(DataflowPipelineWorkerPoolOptions.class); - } - - @Override - public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform( - AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> - transform) { - // By default, if numShards is not set WriteFiles will produce one file per bundle. In - // streaming, there are large numbers of small bundles, resulting in many tiny files. - // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files. - // (current_num_workers * 2 might be a better choice, but that value is not easily available - // today). - // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards. - int numShards; - if (options.getMaxNumWorkers() > 0) { - numShards = options.getMaxNumWorkers() * 2; - } else if (options.getNumWorkers() > 0) { - numShards = options.getNumWorkers() * 2; - } else { - numShards = DEFAULT_NUM_SHARDS; - } - - try { - WriteFiles<UserT, DestinationT, OutputT> replacement = - WriteFiles.<UserT, DestinationT, OutputT>to( - WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform), - WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform)); - if (WriteFilesTranslation.isWindowedWrites(transform)) { - replacement = replacement.withWindowedWrites(); - } - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - replacement.withNumShards(numShards)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, - PDone newOutput) { - return Collections.emptyMap(); - } - } - - @VisibleForTesting static String getContainerImageForJob(DataflowPipelineOptions options) { String workerHarnessContainerImage = options.getWorkerHarnessContainerImage(); if (!workerHarnessContainerImage.contains("IMAGE")) { @@ -1522,39 +1446,4 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch"); } } - - static void verifyStateSupported(DoFn<?, ?> fn) { - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - - for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) { - - // https://issues.apache.org/jira/browse/BEAM-1474 - if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) { - throw new UnsupportedOperationException(String.format( - "%s does not currently support %s", - DataflowRunner.class.getSimpleName(), - MapState.class.getSimpleName() - )); - } - - // https://issues.apache.org/jira/browse/BEAM-1479 - if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) { - throw new UnsupportedOperationException(String.format( - "%s does not currently support %s", - DataflowRunner.class.getSimpleName(), - SetState.class.getSimpleName() - )); - } - } - } - - static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) { - // https://issues.apache.org/jira/browse/BEAM-2507 - if (!strategy.getWindowFn().isNonMerging()) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support state or timers with merging windows", - DataflowRunner.class.getSimpleName())); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java deleted file mode 100644 index fc010f8..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import java.util.Map; -import org.apache.beam.runners.core.construction.ForwardingPTransform; -import org.apache.beam.runners.core.construction.PTransformReplacements; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.runners.core.construction.SplittableParDo; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -/** Transform overrides for supporting {@link SplittableParDo} in the Dataflow runner. */ -class SplittableParDoOverrides { - static class ParDoSingleViaMulti<InputT, OutputT> - extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> { - private final ParDo.SingleOutput<InputT, OutputT> original; - - public ParDoSingleViaMulti( - DataflowRunner ignored, ParDo.SingleOutput<InputT, OutputT> original) { - this.original = original; - } - - @Override - protected PTransform<PCollection<? extends InputT>, PCollection<OutputT>> delegate() { - return original; - } - - @Override - public PCollection<OutputT> expand(PCollection<? extends InputT> input) { - TupleTag<OutputT> mainOutput = new TupleTag<>(); - return input.apply(original.withOutputTags(mainOutput, TupleTagList.empty())).get(mainOutput); - } - } - - static class SplittableParDoOverrideFactory<InputT, OutputT, RestrictionT> - implements PTransformOverrideFactory< - PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> { - @Override - public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform( - AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> - appliedTransform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(appliedTransform), - SplittableParDo.forJavaParDo(appliedTransform.getTransform())); - } - - @Override - public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) { - return ReplacementOutputs.tagged(outputs, newOutput); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index 1853248..6c385d7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PCollectionView; class StreamingViewOverrides { static class StreamingCreatePCollectionViewFactory<ElemT, ViewT> extends SingleInputOutputOverrideFactory< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> { + PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { @Override - public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> + public PTransformReplacement<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( AppliedPTransform< - PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> + PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> transform) { StreamingCreatePCollectionView<ElemT, ViewT> streamingView = new StreamingCreatePCollectionView<>(transform.getTransform().getView()); @@ -56,7 +56,7 @@ class StreamingViewOverrides { } private static class StreamingCreatePCollectionView<ElemT, ViewT> - extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { private final PCollectionView<ViewT> view; private StreamingCreatePCollectionView(PCollectionView<ViewT> view) { @@ -64,7 +64,7 @@ class StreamingViewOverrides { } @Override - public PCollection<ElemT> expand(PCollection<ElemT> input) { + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { return input .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index 06ed1e0..a7452b2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -37,8 +36,7 @@ import org.apache.beam.sdk.values.TupleTag; * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform} * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}. */ -@Internal -public interface TransformTranslator<TransformT extends PTransform> { +interface TransformTranslator<TransformT extends PTransform> { void translate(TransformT transform, TranslationContext context); /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java index 55e0c4e..f82f1f1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java @@ -63,5 +63,4 @@ public class PropertyNames { public static final String USES_KEYED_STATE = "uses_keyed_state"; public static final String VALUE = "value"; public static final String DISPLAY_DATA = "display_data"; - public static final String RESTRICTION_CODER = "restriction_coder"; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java index 172dc6e..bff379f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/TimeUtil.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.util; -import com.google.common.base.Strings; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -99,19 +98,26 @@ public final class TimeUtil { int hour = Integer.valueOf(matcher.group(4)); int minute = Integer.valueOf(matcher.group(5)); int second = Integer.valueOf(matcher.group(6)); - int millis = computeMillis(matcher.group(7)); + int millis = 0; + + String frac = matcher.group(7); + if (frac != null) { + int fracs = Integer.valueOf(frac); + if (frac.length() == 3) { // millisecond resolution + millis = fracs; + } else if (frac.length() == 6) { // microsecond resolution + millis = fracs / 1000; + } else if (frac.length() == 9) { // nanosecond resolution + millis = fracs / 1000000; + } else { + return null; + } + } return new DateTime(year, month, day, hour, minute, second, millis, ISOChronology.getInstanceUTC()).toInstant(); } - private static int computeMillis(String frac) { - if (frac == null) { - return 0; - } - return Integer.valueOf(frac.length() > 3 ? frac.substring(0, 3) : Strings.padEnd(frac, 3, '0')); - } - /** * Converts a {@link ReadableDuration} into a Dataflow API duration string. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 43b2788..89dc2d5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -18,14 +18,11 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.runners.dataflow.util.Structs.getString; -import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -69,22 +66,17 @@ import java.util.Set; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -99,12 +91,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -113,8 +100,6 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; -import org.hamcrest.Matchers; -import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -911,68 +896,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { not(equalTo("true"))); } - /** - * Smoke test to fail fast if translation of a splittable ParDo - * in streaming breaks. - */ - @Test - public void testStreamingSplittableParDoTranslation() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - DataflowRunner runner = DataflowRunner.fromOptions(options); - options.setStreaming(true); - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> windowedInput = pipeline - .apply(Create.of("a")) - .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); - windowedInput.apply(ParDo.of(new TestSplittableFn())); - - runner.replaceTransforms(pipeline); - - Job job = - translator - .translate( - pipeline, - runner, - Collections.<DataflowPackage>emptyList()) - .getJob(); - - // The job should contain a SplittableParDo.ProcessKeyedElements step, translated as - // "SplittableProcessKeyed". - - List<Step> steps = job.getSteps(); - Step processKeyedStep = null; - for (Step step : steps) { - if (step.getKind().equals("SplittableProcessKeyed")) { - assertNull(processKeyedStep); - processKeyedStep = step; - } - } - assertNotNull(processKeyedStep); - - @SuppressWarnings({"unchecked", "rawtypes"}) - DoFnInfo<String, Integer> fnInfo = - (DoFnInfo<String, Integer>) - SerializableUtils.deserializeFromByteArray( - jsonStringToByteArray( - Structs.getString( - processKeyedStep.getProperties(), PropertyNames.SERIALIZED_FN)), - "DoFnInfo"); - assertThat(fnInfo.getDoFn(), instanceOf(TestSplittableFn.class)); - assertThat( - fnInfo.getWindowingStrategy().getWindowFn(), - Matchers.<WindowFn>equalTo(FixedWindows.of(Duration.standardMinutes(1)))); - Coder<?> restrictionCoder = - CloudObjects.coderFromCloudObject( - (CloudObject) - Structs.getObject( - processKeyedStep.getProperties(), PropertyNames.RESTRICTION_CODER)); - - assertEquals(SerializableCoder.of(OffsetRange.class), restrictionCoder); - } - @Test public void testToSingletonTranslationWithIsmSideInput() throws Exception { // A "change detector" test that makes sure the translation @@ -997,15 +920,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(9, steps.size()); + assertEquals(5, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(8); + Step collectionToSingletonStep = steps.get(4); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } @@ -1167,16 +1090,4 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertTrue(String.format("Found duplicate output ids %s", outputIds), outputIds.size() == 0); } - - private static class TestSplittableFn extends DoFn<String, Integer> { - @ProcessElement - public void process(ProcessContext c, OffsetRangeTracker tracker) { - // noop - } - - @GetInitialRestriction - public OffsetRange getInitialRange(String element) { - return null; - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 94985f8..8f10b18 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -50,7 +49,6 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; import java.nio.channels.FileChannel; @@ -64,60 +62,36 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; -import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; -import org.apache.beam.sdk.io.DynamicFileDestinations; -import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunctions; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -133,11 +107,9 @@ import org.mockito.stubbing.Answer; /** * Tests for the {@link DataflowRunner}. - * - * <p>Implements {@link Serializable} because it is caught in closures. */ @RunWith(JUnit4.class) -public class DataflowRunnerTest implements Serializable { +public class DataflowRunnerTest { private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging"; private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp"; @@ -147,12 +119,15 @@ public class DataflowRunnerTest implements Serializable { private static final String PROJECT_ID = "some-project"; private static final String REGION_ID = "some-region-1"; - @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + @Rule + public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); - private transient Dataflow.Projects.Locations.Jobs mockJobs; - private transient GcsUtil mockGcsUtil; + private Dataflow.Projects.Locations.Jobs mockJobs; + private GcsUtil mockGcsUtil; // Asserts that the given Job has all expected fields set. private static void assertValidJob(Job job) { @@ -848,6 +823,7 @@ public class DataflowRunnerTest implements Serializable { DataflowRunner.fromOptions(options); } + @Test public void testValidProfileLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); @@ -1015,71 +991,6 @@ public class DataflowRunnerTest implements Serializable { assertTrue(transform.translated); } - private void verifyMapStateUnsupported(PipelineOptions options) throws Exception { - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of(13, 42))) - .apply( - ParDo.of( - new DoFn<KV<Integer, Integer>, Void>() { - @StateId("fizzle") - private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map(); - - @ProcessElement - public void process() {} - })); - - thrown.expectMessage("MapState"); - thrown.expect(UnsupportedOperationException.class); - p.run(); - } - - @Test - public void testMapStateUnsupportedInBatch() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(false); - verifyMapStateUnsupported(options); - } - - @Test - public void testMapStateUnsupportedInStreaming() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(true); - verifyMapStateUnsupported(options); - } - - private void verifySetStateUnsupported(PipelineOptions options) throws Exception { - Pipeline p = Pipeline.create(options); - p.apply(Create.of(KV.of(13, 42))) - .apply( - ParDo.of( - new DoFn<KV<Integer, Integer>, Void>() { - @StateId("fizzle") - private final StateSpec<SetState<Void>> voidState = StateSpecs.set(); - - @ProcessElement - public void process() {} - })); - - thrown.expectMessage("SetState"); - thrown.expect(UnsupportedOperationException.class); - p.run(); - } - - @Test - public void testSetStateUnsupportedInBatch() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(false); - Pipeline p = Pipeline.create(options); - verifySetStateUnsupported(options); - } - - @Test - public void testSetStateUnsupportedInStreaming() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(true); - verifySetStateUnsupported(options); - } - /** Records all the composite transforms visited within the Pipeline. */ private static class CompositeTransformRecorder extends PipelineVisitor.Defaults { private List<PTransform<?, ?>> transforms = new ArrayList<>(); @@ -1136,8 +1047,8 @@ public class DataflowRunnerTest implements Serializable { } /** - * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the - * runner is successfully run. + * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally + * when the runner issuccessfully run. */ @Test public void testTemplateRunnerFullCompletion() throws Exception { @@ -1216,89 +1127,4 @@ public class DataflowRunnerTest implements Serializable { assertThat( getContainerImageForJob(options), equalTo("gcr.io/java/foo")); } - - @Test - public void testStreamingWriteWithNoShardingReturnsNewTransform() { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10); - testStreamingWriteOverride(options, 20); - } - - @Test - public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() { - PipelineOptions options = TestPipeline.testingPipelineOptions(); - testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS); - } - - private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception { - Pipeline p = Pipeline.create(options); - - p.apply(Create.of(KV.of(13, 42))) - .apply(Window.<KV<Integer, Integer>>into(Sessions.withGapDuration(Duration.millis(1)))) - .apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() { - @StateId("fizzle") - private final StateSpec<ValueState<Void>> voidState = StateSpecs.value(); - - @ProcessElement - public void process() {} - })); - - thrown.expectMessage("merging"); - thrown.expect(UnsupportedOperationException.class); - p.run(); - } - - @Test - public void testMergingStatefulRejectedInStreaming() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(true); - verifyMergingStatefulParDoRejected(options); - } - - @Test - public void testMergingStatefulRejectedInBatch() throws Exception { - PipelineOptions options = buildPipelineOptions(); - options.as(StreamingOptions.class).setStreaming(false); - verifyMergingStatefulParDoRejected(options); - } - - private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) { - TestPipeline p = TestPipeline.fromOptions(options); - - StreamingShardedWriteFactory<Object, Void, Object> factory = - new StreamingShardedWriteFactory<>(p.getOptions()); - WriteFiles<Object, Void, Object> original = - WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity()); - PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of())); - AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>> - originalApplication = - AppliedPTransform.of( - "writefiles", - objs.expand(), - Collections.<TupleTag<?>, PValue>emptyMap(), - original, - p); - - WriteFiles<Object, Void, Object> replacement = - (WriteFiles<Object, Void, Object>) - factory.getReplacementTransform(originalApplication).getTransform(); - assertThat(replacement, not(equalTo((Object) original))); - assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards)); - } - - private static class TestSink extends FileBasedSink<Object, Void> { - @Override - public void validate(PipelineOptions options) {} - - TestSink(String tmpFolder) { - super( - StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)), - DynamicFileDestinations.constant(null)); - } - - @Override - public WriteOperation<Object, Void> createWriteOperation() { - throw new IllegalArgumentException("Should not be used"); - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java index 1ac9fab..e0785d4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/TimeUtilTest.java @@ -47,14 +47,8 @@ public final class TimeUtilTest { assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001001Z")); assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000000Z")); assertEquals(new Instant(1), fromCloudTime("1970-01-01T00:00:00.001000001Z")); - assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.0Z")); - assertEquals(new Instant(0), fromCloudTime("1970-01-01T00:00:00.00Z")); - assertEquals(new Instant(420), fromCloudTime("1970-01-01T00:00:00.42Z")); - assertEquals(new Instant(300), fromCloudTime("1970-01-01T00:00:00.3Z")); - assertEquals(new Instant(20), fromCloudTime("1970-01-01T00:00:00.02Z")); assertNull(fromCloudTime("")); assertNull(fromCloudTime("1970-01-01T00:00:00")); - assertNull(fromCloudTime("1970-01-01T00:00:00.1e3Z")); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index b00ba9c..38aada8 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 7f70204..ddb4aca 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -34,6 +34,8 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <spark.version>1.6.3</spark.version> + <hadoop.version>2.2.0</hadoop.version> <kafka.version>0.9.0.1</kafka.version> <jackson.version>2.4.4</jackson.version> <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version> @@ -101,27 +103,6 @@ <threadCount>4</threadCount> </configuration> </execution> - <execution> - <id>streaming-tests</id> - <phase>test</phase> - <goals> - <goal>test</goal> - </goals> - <configuration> - <groups> - org.apache.beam.runners.spark.StreamingTest - </groups> - <systemPropertyVariables> - <beamTestPipelineOptions> - [ - "--runner=TestSparkRunner", - "--forceStreaming=true", - "--enableSparkMetricSinks=true" - ] - </beamTestPipelineOptions> - </systemPropertyVariables> - </configuration> - </execution> </executions> </plugin> </plugins> @@ -133,33 +114,31 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-network-common_2.10</artifactId> + <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> - <exclusions> - <!-- Fix build on JDK-9 --> - <exclusion> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> @@ -216,11 +195,6 @@ <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-text</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> @@ -347,13 +321,6 @@ <type>test-jar</type> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> </dependencies> <build> @@ -393,6 +360,27 @@ </systemPropertyVariables> </configuration> </execution> + <execution> + <id>streaming-tests</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <groups> + org.apache.beam.runners.spark.StreamingTest + </groups> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=TestSparkRunner", + "--forceStreaming=true", + "--enableSparkMetricSinks=true" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java index 6972acb..d75c955 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -35,7 +35,8 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; -import org.apache.commons.text.WordUtils; +import org.apache.commons.lang3.text.WordUtils; + /** * Pipeline visitor for translating a Beam pipeline into equivalent Spark operations. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 595521f..9e2426e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.metrics.AggregatorMetricSource; @@ -171,7 +170,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } // register Watermarks listener to broadcast the advanced WMs. - jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener())); + jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc))); // The reason we call initAccumulators here even though it is called in // SparkRunnerStreamingContextFactory is because the factory is not called when resuming @@ -360,12 +359,10 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { protected boolean shouldDefer(TransformHierarchy.Node node) { // if the input is not a PCollection, or it is but with non merging windows, don't defer. - Collection<PValue> nonAdditionalInputs = - TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline())); - if (nonAdditionalInputs.size() != 1) { + if (node.getInputs().size() != 1) { return false; } - PValue input = Iterables.getOnlyElement(nonAdditionalInputs); + PValue input = Iterables.getOnlyElement(node.getInputs().values()); if (!(input instanceof PCollection) || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index a13a3b1..eccee57 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -169,7 +169,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { result.waitUntilFinish(Duration.millis(batchDurationMillis)); do { SparkTimerInternals sparkTimerInternals = - SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis)); + SparkTimerInternals.global(GlobalWatermarkHolder.get()); sparkTimerInternals.advanceWatermark(); globalWatermark = sparkTimerInternals.currentInputWatermarkTime(); // let another batch-interval period of execution, just to reason about WM propagation. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1385e07..be4f3f6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -104,15 +104,13 @@ public class SparkGroupAlsoByWindowViaWindowSet { public static <K, InputT, W extends BoundedWindow> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow( - final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, + JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, final Coder<K> keyCoder, final Coder<WindowedValue<InputT>> wvCoder, final WindowingStrategy<?, W> windowingStrategy, final SparkRuntimeContext runtimeContext, final List<Integer> sourceIds) { - final long batchDurationMillis = - runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis(); final IterableCoder<WindowedValue<InputT>> itrWvCoder = IterableCoder.of(wvCoder); final Coder<InputT> iCoder = ((FullWindowedValueCoder<InputT>) wvCoder).getValueCoder(); final Coder<? extends BoundedWindow> wCoder = @@ -241,7 +239,7 @@ public class SparkGroupAlsoByWindowViaWindowSet { SparkStateInternals<K> stateInternals; SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources( - sourceIds, GlobalWatermarkHolder.get(batchDurationMillis)); + sourceIds, GlobalWatermarkHolder.get()); // get state(internals) per key. if (prevStateAndTimersOpt.isEmpty()) { // no previous state. http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index a68da55..107915f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.spark.broadcast.Broadcast; import org.joda.time.Instant; @@ -57,10 +58,10 @@ public class SparkTimerInternals implements TimerInternals { /** Build the {@link TimerInternals} according to the feeding streams. */ public static SparkTimerInternals forStreamFromSources( List<Integer> sourceIds, - Map<Integer, SparkWatermarks> watermarks) { - // if watermarks are invalid for the specific ids, use defaults. - if (watermarks == null || watermarks.isEmpty() - || Collections.disjoint(sourceIds, watermarks.keySet())) { + @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) { + // if broadcast is invalid for the specific ids, use defaults. + if (broadcast == null || broadcast.getValue().isEmpty() + || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) { return new SparkTimerInternals( BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0)); } @@ -70,7 +71,7 @@ public class SparkTimerInternals implements TimerInternals { // synchronized processing time should clearly be synchronized. Instant synchronizedProcessingTime = null; for (Integer sourceId: sourceIds) { - SparkWatermarks sparkWatermarks = watermarks.get(sourceId); + SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId); if (sparkWatermarks != null) { // keep slowest WMs. slowestLowWatermark = slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark()) @@ -93,9 +94,10 @@ public class SparkTimerInternals implements TimerInternals { } /** Build a global {@link TimerInternals} for all feeding streams.*/ - public static SparkTimerInternals global(Map<Integer, SparkWatermarks> watermarks) { - return watermarks == null ? forStreamFromSources(Collections.<Integer>emptyList(), null) - : forStreamFromSources(Lists.newArrayList(watermarks.keySet()), watermarks); + public static SparkTimerInternals global( + @Nullable Broadcast<Map<Integer, SparkWatermarks>> broadcast) { + return broadcast == null ? forStreamFromSources(Collections.<Integer>emptyList(), null) + : forStreamFromSources(Lists.newArrayList(broadcast.getValue().keySet()), broadcast); } Collection<TimerData> getTimers() { http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 0c6c4d1..8102926 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -26,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.sdk.Pipeline; @@ -104,8 +103,7 @@ public class EvaluationContext { public <T extends PValue> T getInput(PTransform<T, ?> transform) { @SuppressWarnings("unchecked") - T input = - (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); + T input = (T) Iterables.getOnlyElement(getInputs(transform).values()); return input; } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index ac5e0cd..64aa35a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -508,6 +508,50 @@ public final class TransformTranslator { }; } + private static <T> TransformEvaluator<View.AsSingleton<T>> viewAsSingleton() { + return new TransformEvaluator<View.AsSingleton<T>>() { + @Override + public void evaluate(View.AsSingleton<T> transform, EvaluationContext context) { + Iterable<? extends WindowedValue<?>> iter = + context.getWindowedValues(context.getInput(transform)); + PCollectionView<T> output = context.getOutput(transform); + Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); + + @SuppressWarnings("unchecked") + Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; + + context.putPView(output, iterCast, coderInternal); + } + + @Override + public String toNativeString() { + return "collect()"; + } + }; + } + + private static <T> TransformEvaluator<View.AsIterable<T>> viewAsIter() { + return new TransformEvaluator<View.AsIterable<T>>() { + @Override + public void evaluate(View.AsIterable<T> transform, EvaluationContext context) { + Iterable<? extends WindowedValue<?>> iter = + context.getWindowedValues(context.getInput(transform)); + PCollectionView<Iterable<T>> output = context.getOutput(transform); + Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); + + @SuppressWarnings("unchecked") + Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter; + + context.putPView(output, iterCast, coderInternal); + } + + @Override + public String toNativeString() { + return "collect()"; + } + }; + } + private static <ReadT, WriteT> TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>> createPCollView() { return new TransformEvaluator<View.CreatePCollectionView<ReadT, WriteT>>() { @@ -516,7 +560,7 @@ public final class TransformTranslator { EvaluationContext context) { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); - PCollectionView<WriteT> output = transform.getView(); + PCollectionView<WriteT> output = context.getOutput(transform); Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); @SuppressWarnings("unchecked") @@ -601,8 +645,8 @@ public final class TransformTranslator { EVALUATORS.put(Combine.PerKey.class, combinePerKey()); EVALUATORS.put(Flatten.PCollections.class, flattenPColl()); EVALUATORS.put(Create.Values.class, create()); -// EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); -// EVALUATORS.put(View.AsIterable.class, viewAsIter()); + EVALUATORS.put(View.AsSingleton.class, viewAsSingleton()); + EVALUATORS.put(View.AsIterable.class, viewAsIter()); EVALUATORS.put(View.CreatePCollectionView.class, createPCollView()); EVALUATORS.put(Window.Assign.class, window()); EVALUATORS.put(Reshuffle.class, reshuffle()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 2cb6f26..8b384d8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -21,43 +21,31 @@ package org.apache.beam.runners.spark.util; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.Maps; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.spark.SparkEnv; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; -import org.apache.spark.storage.BlockId; -import org.apache.spark.storage.BlockManager; -import org.apache.spark.storage.BlockResult; -import org.apache.spark.storage.BlockStore; -import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; import org.joda.time.Instant; -import scala.Option; + /** - * A {@link BlockStore} variable to hold the global watermarks for a micro-batch. + * A {@link Broadcast} variable to hold the global watermarks for a micro-batch. * * <p>For each source, holds a queue for the watermarks of each micro-batch that was read, * and advances the watermarks according to the queue (first-in-first-out). */ public class GlobalWatermarkHolder { - private static final Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>(); - private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS"); - - private static volatile Map<Integer, SparkWatermarks> driverWatermarks = null; - private static volatile LoadingCache<String, Map<Integer, SparkWatermarks>> watermarkCache = null; + // the broadcast is broadcasted to the workers. + private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = null; + // this should only live in the driver so transient. + private static final transient Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<>(); public static void add(int sourceId, SparkWatermarks sparkWatermarks) { Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId); @@ -83,48 +71,22 @@ public class GlobalWatermarkHolder { * Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped * to their sources. */ - @SuppressWarnings("unchecked") - public static Map<Integer, SparkWatermarks> get(Long cacheInterval) { - if (driverWatermarks != null) { - // if we are executing in local mode simply return the local values. - return driverWatermarks; - } else { - if (watermarkCache == null) { - initWatermarkCache(cacheInterval); - } - try { - return watermarkCache.get("SINGLETON"); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - } - - private static synchronized void initWatermarkCache(Long batchDuration) { - if (watermarkCache == null) { - watermarkCache = - CacheBuilder.newBuilder() - // expire watermarks every half batch duration to ensure they update in every batch. - .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS) - .build(new WatermarksLoader()); - } + public static Broadcast<Map<Integer, SparkWatermarks>> get() { + return broadcast; } /** * Advances the watermarks to the next-in-line watermarks. * SparkWatermarks are monotonically increasing. */ - @SuppressWarnings("unchecked") - public static void advance() { - synchronized (GlobalWatermarkHolder.class) { - BlockManager blockManager = SparkEnv.get().blockManager(); - + public static void advance(JavaSparkContext jsc) { + synchronized (GlobalWatermarkHolder.class){ if (sourceTimes.isEmpty()) { return; } // update all sources' watermarks into the new broadcast. - Map<Integer, SparkWatermarks> newValues = new HashMap<>(); + Map<Integer, SparkWatermarks> newBroadcast = new HashMap<>(); for (Map.Entry<Integer, Queue<SparkWatermarks>> en: sourceTimes.entrySet()) { if (en.getValue().isEmpty()) { @@ -137,22 +99,8 @@ public class GlobalWatermarkHolder { Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - Option<BlockResult> currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID); - Map<Integer, SparkWatermarks> current; - if (currentOption.isDefined()) { - current = (Map<Integer, SparkWatermarks>) currentOption.get().data().next(); - } else { - current = Maps.newHashMap(); - blockManager.putSingle( - WATERMARKS_BLOCK_ID, - current, - StorageLevel.MEMORY_ONLY(), - true); - } - - if (current.containsKey(sourceId)) { - SparkWatermarks currentTimes = current.get(sourceId); + if (broadcast != null && broadcast.getValue().containsKey(sourceId)) { + SparkWatermarks currentTimes = broadcast.getValue().get(sourceId); currentLowWatermark = currentTimes.getLowWatermark(); currentHighWatermark = currentTimes.getHighWatermark(); currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime(); @@ -171,21 +119,20 @@ public class GlobalWatermarkHolder { nextLowWatermark, nextHighWatermark)); checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), "Synchronized processing time must advance."); - newValues.put( + newBroadcast.put( sourceId, new SparkWatermarks( nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime)); } // update the watermarks broadcast only if something has changed. - if (!newValues.isEmpty()) { - driverWatermarks = newValues; - blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); - blockManager.putSingle( - WATERMARKS_BLOCK_ID, - newValues, - StorageLevel.MEMORY_ONLY(), - true); + if (!newBroadcast.isEmpty()) { + if (broadcast != null) { + // for now this is blocking, we could make this asynchronous + // but it could slow down WM propagation. + broadcast.destroy(); + } + broadcast = jsc.broadcast(newBroadcast); } } } @@ -193,12 +140,7 @@ public class GlobalWatermarkHolder { @VisibleForTesting public static synchronized void clear() { sourceTimes.clear(); - driverWatermarks = null; - SparkEnv sparkEnv = SparkEnv.get(); - if (sparkEnv != null) { - BlockManager blockManager = sparkEnv.blockManager(); - blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); - } + broadcast = null; } /** @@ -243,24 +185,15 @@ public class GlobalWatermarkHolder { /** Advance the WMs onBatchCompleted event. */ public static class WatermarksListener extends JavaStreamingListener { - @Override - public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { - GlobalWatermarkHolder.advance(); - } - } + private final JavaStreamingContext jssc; - private static class WatermarksLoader extends CacheLoader<String, Map<Integer, SparkWatermarks>> { + public WatermarksListener(JavaStreamingContext jssc) { + this.jssc = jssc; + } - @SuppressWarnings("unchecked") @Override - public Map<Integer, SparkWatermarks> load(@Nonnull String key) throws Exception { - Option<BlockResult> blockResultOption = - SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID); - if (blockResultOption.isDefined()) { - return (Map<Integer, SparkWatermarks>) blockResultOption.get().data().next(); - } else { - return Maps.newHashMap(); - } + public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { + GlobalWatermarkHolder.advance(jssc.sparkContext()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java index 1708123..47a6e3f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java @@ -65,17 +65,17 @@ public class GlobalWatermarkHolderTest { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(5)), instant)); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); // low < high. GlobalWatermarkHolder.add(1, new SparkWatermarks( instant.plus(Duration.millis(10)), instant.plus(Duration.millis(15)), instant.plus(Duration.millis(100)))); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); // assert watermarks in Broadcast. - SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get(0L).get(1); + SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1); assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10)))); assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15)))); assertThat(currentWatermarks.getSynchronizedProcessingTime(), @@ -93,7 +93,7 @@ public class GlobalWatermarkHolderTest { instant.plus(Duration.millis(25)), instant.plus(Duration.millis(20)), instant.plus(Duration.millis(200)))); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); } @Test @@ -106,7 +106,7 @@ public class GlobalWatermarkHolderTest { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(10)), instant)); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); thrown.expect(IllegalStateException.class); thrown.expectMessage("Synchronized processing time must advance."); @@ -117,7 +117,7 @@ public class GlobalWatermarkHolderTest { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(10)), instant)); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); } @Test @@ -136,15 +136,15 @@ public class GlobalWatermarkHolderTest { instant.plus(Duration.millis(6)), instant)); - GlobalWatermarkHolder.advance(); + GlobalWatermarkHolder.advance(jsc); // assert watermarks for source 1. - SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get(0L).get(1); + SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1); assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5)))); assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10)))); // assert watermarks for source 2. - SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get(0L).get(2); + SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2); assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3)))); assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6)))); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 246eb81..64ff98c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -52,6 +52,7 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; + /** * Test {@link SparkRunnerDebugger} with different pipelines. */ @@ -84,20 +85,17 @@ public class SparkRunnerDebuggerTest { .apply(MapElements.via(new WordCount.FormatAsTextFn())) .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt")); - final String expectedPipeline = - "sparkContext.parallelize(Arrays.asList(...))\n" - + "_.mapPartitions(" - + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" - + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" - + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n" - + "_.groupByKey()\n" - + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n" - + "_.mapPartitions(new org.apache.beam.runners.spark" - + ".SparkRunnerDebuggerTest$PlusOne())\n" - + "sparkContext.union(...)\n" - + "_.mapPartitions(" - + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" - + "_.<org.apache.beam.sdk.io.TextIO$Write>"; + final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" + + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark" + + ".SparkRunnerDebuggerTest$PlusOne())\n" + + "sparkContext.union(...)\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" + + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>"; SparkRunnerDebugger.DebugSparkPipelineResult result = (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
