DirectRunner: Expand ParDo.Bound into ParDo.BoundMulti
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f86e98c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f86e98c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f86e98c9 Branch: refs/heads/gearpump-runner Commit: f86e98c91cedbb1d9fd54e3268dfd8f014ac2f27 Parents: 34e2a35 Author: Kenneth Knowles <[email protected]> Authored: Wed Nov 16 15:43:47 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Fri Nov 18 15:09:43 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 3 +- .../runners/direct/ParDoEvaluatorFactory.java | 56 +-- .../direct/ParDoMultiEvaluatorHooks.java | 55 --- .../direct/ParDoMultiOverrideFactory.java | 51 +++ .../runners/direct/ParDoOverrideFactory.java | 53 --- .../direct/ParDoSingleEvaluatorHooks.java | 58 --- .../ParDoSingleViaMultiOverrideFactory.java | 66 +++ .../direct/TransformEvaluatorRegistry.java | 7 +- .../direct/ParDoMultiEvaluatorHooksTest.java | 439 ------------------- .../direct/ParDoSingleEvaluatorHooksTest.java | 335 -------------- 10 files changed, 139 insertions(+), 984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index cce73c3..0060e84 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -87,7 +87,8 @@ public class DirectRunner .put(GroupByKey.class, new DirectGroupByKeyOverrideFactory()) .put(TestStream.class, new DirectTestStreamFactory()) .put(Write.Bound.class, new WriteWithShardingFactory()) - .put(ParDo.Bound.class, new ParDoOverrideFactory()) + .put(ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiOverrideFactory()) .put( GBKIntoKeyedWorkItems.class, new DirectGBKIntoKeyedWorkItemsOverrideFactory()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index ee4987f..f126000 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -24,49 +24,22 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PCollectionTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform - * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}. - */ -final class ParDoEvaluatorFactory< - InputT, - OutputT, - TransformOutputT extends POutput, - TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> - implements TransformEvaluatorFactory { - interface TransformHooks< - InputT, - OutputT, - TransformOutputT extends POutput, - TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> { - /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */ - DoFn<InputT, OutputT> getDoFn(TransformT transform); - - /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */ - ParDoEvaluator<InputT, OutputT> createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application, - DirectStepContext stepContext, - DoFn<InputT, OutputT> fnLocal); - } +/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */ +final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; - private final TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks; - ParDoEvaluatorFactory( - EvaluationContext evaluationContext, - TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks) { + ParDoEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; - this.hooks = hooks; fnClones = CacheBuilder.newBuilder() .build( @@ -95,7 +68,8 @@ final class ParDoEvaluatorFactory< @SuppressWarnings({"unchecked", "rawtypes"}) private TransformEvaluator<InputT> createEvaluator( - AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application, + AppliedPTransform<PCollection<InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>> + application, CommittedBundle<InputT> inputBundle) throws Exception { String stepName = evaluationContext.getStepName(application); @@ -104,12 +78,20 @@ final class ParDoEvaluatorFactory< .getExecutionContext(application, inputBundle.getKey()) .getOrCreateStepContext(stepName, stepName); - DoFnLifecycleManager fnManager = - fnClones.getUnchecked(hooks.getDoFn(application.getTransform())); + DoFnLifecycleManager fnManager = fnClones.getUnchecked(application.getTransform().getNewFn()); try { + ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform(); return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( - hooks.createParDoEvaluator( - evaluationContext, application, stepContext, (DoFn<InputT, OutputT>) fnManager.get()), + ParDoEvaluator.create( + evaluationContext, + stepContext, + application, + application.getInput().getWindowingStrategy(), + fnManager.get(), + transform.getSideInputs(), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + application.getOutput().getAll()), fnManager); } catch (Exception e) { try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java deleted file mode 100644 index f30f209..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; - -/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */ -class ParDoMultiEvaluatorHooks<InputT, OutputT> - implements ParDoEvaluatorFactory.TransformHooks< - InputT, OutputT, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> { - @Override - public DoFn<InputT, OutputT> getDoFn(ParDo.BoundMulti<InputT, OutputT> transform) { - return transform.getNewFn(); - } - - @Override - public ParDoEvaluator<InputT, OutputT> createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> - application, - DirectStepContext stepContext, - DoFn<InputT, OutputT> fnLocal) { - ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform(); - return ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - application.getInput().getWindowingStrategy(), - fnLocal, - transform.getSideInputs(), - transform.getMainOutputTag(), - transform.getSideOutputTags().getAll(), - application.getOutput().getAll()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java new file mode 100644 index 0000000..6cc3e6e --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; + +/** + * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo} + * in the direct runner. Currently overrides applications of <a + * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. + */ +class ParDoMultiOverrideFactory<InputT, OutputT> + implements PTransformOverrideFactory< + PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> { + + @Override + @SuppressWarnings("unchecked") + public PTransform<PCollection<? extends InputT>, PCollectionTuple> override( + ParDo.BoundMulti<InputT, OutputT> transform) { + + DoFn<InputT, OutputT> fn = transform.getNewFn(); + DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); + if (!signature.processElement().isSplittable()) { + return transform; + } else { + return new SplittableParDo(fn); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java deleted file mode 100644 index 27941f8..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.values.PCollection; - -/** - * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo} - * in the direct runner. Currently overrides applications of <a - * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. - */ -class ParDoOverrideFactory<InputT, OutputT> - implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { - @Override - @SuppressWarnings("unchecked") - public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override( - ParDo.Bound<InputT, OutputT> transform) { - ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform; - DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn()); - if (fn == null) { - // This is an OldDoFn, hence not splittable. - return transform; - } - DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); - if (!signature.processElement().isSplittable()) { - return transform; - } - return new SplittableParDo(fn); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java deleted file mode 100644 index 6d284c2..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import com.google.common.collect.ImmutableMap; -import java.util.Collections; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */ -class ParDoSingleEvaluatorHooks<InputT, OutputT> - implements ParDoEvaluatorFactory.TransformHooks< - InputT, OutputT, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { - @Override - public DoFn<InputT, OutputT> getDoFn(ParDo.Bound<InputT, OutputT> transform) { - return transform.getNewFn(); - } - - @Override - public ParDoEvaluator<InputT, OutputT> createParDoEvaluator( - EvaluationContext evaluationContext, - AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> - application, - DirectStepContext stepContext, - DoFn<InputT, OutputT> fnLocal) { - TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); - ParDo.Bound<InputT, OutputT> transform = application.getTransform(); - return ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - application.getInput().getWindowingStrategy(), - fnLocal, - transform.getSideInputs(), - mainOutputTag, - Collections.<TupleTag<?>>emptyList(), - ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java new file mode 100644 index 0000000..ee3dfc5 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * A {@link PTransformOverrideFactory} that overrides single-output {@link ParDo} to implement + * it in terms of multi-output {@link ParDo}. + */ +class ParDoSingleViaMultiOverrideFactory<InputT, OutputT> + implements PTransformOverrideFactory< + PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { + @Override + @SuppressWarnings("unchecked") + public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> override( + ParDo.Bound<InputT, OutputT> transform) { + return new ParDoSingleViaMulti(transform); + } + + static class ParDoSingleViaMulti<InputT, OutputT> + extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { + private static final String MAIN_OUTPUT_TAG = "main"; + + private final ParDo.Bound<InputT, OutputT> underlyingParDo; + + public ParDoSingleViaMulti(ParDo.Bound<InputT, OutputT> underlyingParDo) { + this.underlyingParDo = underlyingParDo; + } + + @Override + public PCollection<OutputT> apply(PCollection<? extends InputT> input) { + + // Output tags for ParDo need only be unique up to applied transform + TupleTag<OutputT> mainOutputTag = new TupleTag<OutputT>(MAIN_OUTPUT_TAG); + + PCollectionTuple output = + input.apply( + ParDo.of(underlyingParDo.getNewFn()) + .withSideInputs(underlyingParDo.getSideInputs()) + .withOutputTags(mainOutputTag, TupleTagList.empty())); + + return output.get(mainOutputTag); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 51502f7..0514c3a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -49,12 +49,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put( - ParDo.Bound.class, - new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>())) - .put( - ParDo.BoundMulti.class, - new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>())) + .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt)) .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt)) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java deleted file mode 100644 index 6302d37..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.Serializable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ParDoMultiEvaluatorHooks}. - */ -@RunWith(JUnit4.class) -public class ParDoMultiEvaluatorHooksTest implements Serializable { - private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - - @Test - public void testParDoMultiInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder( - lengthOutputBundle, mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - lengthOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<Integer>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testParDoMultiUndeclaredSideOutput() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); - final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String STATE_ID = "my-state-id"; - - @StateId(STATE_ID) - private final StateSpec<Object, BagState<String>> bagSpec = - StateSpecs.bag(StringUtf8Coder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { - bagState.add(c.element()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, - null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(20205L))); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String EVENT_TIME_TIMER = "event-time-timer"; - private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; - - @TimerId(EVENT_TIME_TIMER) - TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(SYNC_PROC_TIME_TIMER) - TimerSpec syncProcTimerSpec = - TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, - @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { - - eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); - syncProcTimeTimer.cancel(); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) - .setTimer(addedTimer) - .setTimer(addedTimer) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .build())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f86e98c9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java deleted file mode 100644 index 10cd7c5..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooksTest.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.Serializable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ParDoSingleEvaluatorHooks}. - */ -@RunWith(JUnit4.class) -public class ParDoSingleEvaluatorHooksTest implements Serializable { - private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - - @Test - public void testParDoInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - PCollection<Integer> collection = - input.apply( - ParDo.of( - new DoFn<String, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element().length()); - } - })); - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection); - when(evaluationContext.createBundle(collection)).thenReturn(outputBundle); - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - collection.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - outputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<Integer>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {}; - PCollection<Integer> collection = - input.apply( - ParDo.of( - new DoFn<String, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element().length()); - } - })); - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(collection); - when(evaluationContext.createBundle(collection)).thenReturn(outputBundle); - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - collection.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - ParDo.Bound<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String STATE_ID = "my-state-id"; - - @StateId(STATE_ID) - private final StateSpec<Object, BagState<String>> bagSpec = - StateSpecs.bag(StringUtf8Coder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { - bagState.add(c.element()); - } - }); - PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())) - .thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - // TODO: this timer data is absolute, but the new API only support relative settings. - // It will require adjustments when @Ignore is removed - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - ParDo.Bound<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String EVENT_TIME_TIMER = "event-time-timer"; - private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; - - @TimerId(EVENT_TIME_TIMER) - TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(SYNC_PROC_TIME_TIMER) - TimerSpec syncProcTimerSpec = - TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, - @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { - eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); - syncProcTimeTimer.cancel(); - } - }); - PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - - StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of()); - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - key, - null, - null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())) - .thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoEvaluatorFactory<>(evaluationContext, new ParDoSingleEvaluatorHooks<>()) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - - TransformResult result = evaluator.finishBundle(); - assertThat(result.getTimerUpdate(), - equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .build())); - } -}
