Deduplicates ParDo{Single,Multi}EvaluatorFactory This is in preparation for adding a third one for a future ParDo-like primitive transform to be introduced inside SplittableParDo.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/726998ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/726998ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/726998ae Branch: refs/heads/master Commit: 726998ae68ee99eb10bf43ff8aae1a5f121728a4 Parents: 7e0cfe5 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Oct 26 16:34:47 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Nov 1 16:45:07 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/ParDoEvaluator.java | 18 +- .../runners/direct/ParDoEvaluatorFactory.java | 126 ++++++ .../direct/ParDoMultiEvaluatorFactory.java | 106 ----- .../direct/ParDoMultiEvaluatorHooks.java | 54 +++ .../direct/ParDoSingleEvaluatorFactory.java | 109 ----- .../direct/ParDoSingleEvaluatorHooks.java | 57 +++ .../direct/TransformEvaluatorRegistry.java | 10 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 10 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 439 ------------------- .../direct/ParDoMultiEvaluatorHooksTest.java | 439 +++++++++++++++++++ .../direct/ParDoSingleEvaluatorFactoryTest.java | 335 -------------- .../direct/ParDoSingleEvaluatorHooksTest.java | 335 ++++++++++++++ 12 files changed, 1030 insertions(+), 1008 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index ff49b60..5913379 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -39,8 +39,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -class ParDoEvaluator<T> implements TransformEvaluator<T> { - public static <InputT, OutputT> ParDoEvaluator<InputT> create( +class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT> { + public static <InputT, OutputT> ParDoEvaluator<InputT, OutputT> create( EvaluationContext evaluationContext, DirectStepContext stepContext, AppliedPTransform<PCollection<InputT>, ?, ?> application, @@ -86,17 +86,17 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> { //////////////////////////////////////////////////////////////////////////////////////////////// - private final PushbackSideInputDoFnRunner<T, ?> fnRunner; - private final AppliedPTransform<PCollection<T>, ?, ?> transform; + private final PushbackSideInputDoFnRunner<InputT, ?> fnRunner; + private final AppliedPTransform<PCollection<InputT>, ?, ?> transform; private final AggregatorContainer.Mutator aggregatorChanges; private final Collection<UncommittedBundle<?>> outputBundles; private final DirectStepContext stepContext; - private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements; + private final ImmutableList.Builder<WindowedValue<InputT>> unprocessedElements; private ParDoEvaluator( - PushbackSideInputDoFnRunner<T, ?> fnRunner, - AppliedPTransform<PCollection<T>, ?, ?> transform, + PushbackSideInputDoFnRunner<InputT, ?> fnRunner, + AppliedPTransform<PCollection<InputT>, ?, ?> transform, AggregatorContainer.Mutator aggregatorChanges, Collection<UncommittedBundle<?>> outputBundles, DirectStepContext stepContext) { @@ -109,9 +109,9 @@ class ParDoEvaluator<T> implements TransformEvaluator<T> { } @Override - public void processElement(WindowedValue<T> element) { + public void processElement(WindowedValue<InputT> element) { try { - Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element); + Iterable<WindowedValue<InputT>> unprocessed = fnRunner.processElementInReadyWindows(element); unprocessedElements.addAll(unprocessed); } catch (Exception e) { throw UserCodeException.wrap(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java new file mode 100644 index 0000000..ee4987f --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link TransformEvaluatorFactory} for {@link ParDo}-like primitive {@link PTransform + * PTransforms}, parameterized by some {@link TransformHooks transform-specific handling}. + */ +final class ParDoEvaluatorFactory< + InputT, + OutputT, + TransformOutputT extends POutput, + TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> + implements TransformEvaluatorFactory { + interface TransformHooks< + InputT, + OutputT, + TransformOutputT extends POutput, + TransformT extends PTransform<PCollection<? extends InputT>, TransformOutputT>> { + /** Returns the {@link DoFn} contained in the given {@link ParDo} transform. */ + DoFn<InputT, OutputT> getDoFn(TransformT transform); + + /** Configures and creates a {@link ParDoEvaluator} for the given {@link DoFn}. */ + ParDoEvaluator<InputT, OutputT> createParDoEvaluator( + EvaluationContext evaluationContext, + AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application, + DirectStepContext stepContext, + DoFn<InputT, OutputT> fnLocal); + } + + private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); + private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones; + private final EvaluationContext evaluationContext; + private final TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks; + + ParDoEvaluatorFactory( + EvaluationContext evaluationContext, + TransformHooks<InputT, OutputT, TransformOutputT, TransformT> hooks) { + this.evaluationContext = evaluationContext; + this.hooks = hooks; + fnClones = + CacheBuilder.newBuilder() + .build( + new CacheLoader<DoFn<?, ?>, DoFnLifecycleManager>() { + @Override + public DoFnLifecycleManager load(DoFn<?, ?> key) throws Exception { + return DoFnLifecycleManager.of(key); + } + }); + } + + @Override + public <T> TransformEvaluator<T> forApplication( + AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + (TransformEvaluator<T>) + createEvaluator((AppliedPTransform) application, (CommittedBundle) inputBundle); + return evaluator; + } + + @Override + public void cleanup() throws Exception { + DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private TransformEvaluator<InputT> createEvaluator( + AppliedPTransform<PCollection<InputT>, TransformOutputT, TransformT> application, + CommittedBundle<InputT> inputBundle) + throws Exception { + String stepName = evaluationContext.getStepName(application); + DirectStepContext stepContext = + evaluationContext + .getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext(stepName, stepName); + + DoFnLifecycleManager fnManager = + fnClones.getUnchecked(hooks.getDoFn(application.getTransform())); + try { + return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( + hooks.createParDoEvaluator( + evaluationContext, application, stepContext, (DoFn<InputT, OutputT>) fnManager.get()), + fnManager); + } catch (Exception e) { + try { + fnManager.remove(); + } catch (Exception removalException) { + LOG.error( + "Exception encountered while cleaning up in ParDo evaluator construction", + removalException); + e.addSuppressed(removalException); + } + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java deleted file mode 100644 index ccda0e2..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ /dev/null @@ -1,106 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import java.util.Map; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the - * {@link BoundMulti} primitive {@link PTransform}. - */ -class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { - private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class); - private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones; - private final EvaluationContext evaluationContext; - - public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) { - this.evaluationContext = evaluationContext; - fnClones = CacheBuilder.newBuilder() - .build(new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { - @Override - public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key) - throws Exception { - BoundMulti<?, ?> bound = (BoundMulti<?, ?>) key.getTransform(); - return DoFnLifecycleManager.of(bound.getNewFn()); - } - }); - } - - @Override - public <T> TransformEvaluator<T> forApplication( - AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { - @SuppressWarnings({"unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = - createMultiEvaluator((AppliedPTransform) application, inputBundle); - return evaluator; - } - - @Override - public void cleanup() throws Exception { - DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values()); - } - - private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator( - AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application, - CommittedBundle<InT> inputBundle) throws Exception { - Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); - - DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) application); - String stepName = evaluationContext.getStepName(application); - DirectStepContext stepContext = - evaluationContext.getExecutionContext(application, inputBundle.getKey()) - .getOrCreateStepContext(stepName, stepName); - try { - @SuppressWarnings({"unchecked", "rawtypes"}) - TransformEvaluator<InT> parDoEvaluator = - ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - (DoFn) fnLocal.get(), - application.getTransform().getSideInputs(), - application.getTransform().getMainOutputTag(), - application.getTransform().getSideOutputTags().getAll(), - outputs); - return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnLocal); - } catch (Exception e) { - try { - fnLocal.remove(); - } catch (Exception removalException) { - LOG.error("Exception encountered while cleaning up in ParDo evaluator construction", - removalException); - e.addSuppressed(removalException); - } - throw e; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java new file mode 100644 index 0000000..a566154 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; + +/** Support for {@link ParDo.BoundMulti} in {@link ParDoEvaluatorFactory}. */ +class ParDoMultiEvaluatorHooks<InputT, OutputT> + implements ParDoEvaluatorFactory.TransformHooks< + InputT, OutputT, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> { + @Override + public DoFn<InputT, OutputT> getDoFn(ParDo.BoundMulti<InputT, OutputT> transform) { + return transform.getNewFn(); + } + + @Override + public ParDoEvaluator<InputT, OutputT> createParDoEvaluator( + EvaluationContext evaluationContext, + AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> + application, + DirectStepContext stepContext, + DoFn<InputT, OutputT> fnLocal) { + ParDo.BoundMulti<InputT, OutputT> transform = application.getTransform(); + return ParDoEvaluator.create( + evaluationContext, + stepContext, + application, + fnLocal, + transform.getSideInputs(), + transform.getMainOutputTag(), + transform.getSideOutputTags().getAll(), + application.getOutput().getAll()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java deleted file mode 100644 index d2a678d..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import java.util.Collections; -import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo.Bound; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the - * {@link Bound ParDo.Bound} primitive {@link PTransform}. - */ -class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { - private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class); - private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones; - private final EvaluationContext evaluationContext; - - public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) { - this.evaluationContext = evaluationContext; - fnClones = - CacheBuilder.newBuilder() - .build( - new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>() { - @Override - public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> key) - throws Exception { - Bound<?, ?> bound = (Bound<?, ?>) key.getTransform(); - return DoFnLifecycleManager.of(bound.getNewFn()); - } - }); - } - - @Override - public <T> TransformEvaluator<T> forApplication( - final AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle) throws Exception { - @SuppressWarnings({"unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = - createSingleEvaluator((AppliedPTransform) application, inputBundle); - return evaluator; - } - - @Override - public void cleanup() throws Exception { - DoFnLifecycleManagers.removeAllFromManagers(fnClones.asMap().values()); - } - - private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator( - AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>> - application, - CommittedBundle<InputT> inputBundle) - throws Exception { - TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); - String stepName = evaluationContext.getStepName(application); - DirectStepContext stepContext = - evaluationContext.getExecutionContext(application, inputBundle.getKey()) - .getOrCreateStepContext(stepName, stepName); - - DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) application); - try { - @SuppressWarnings({"unchecked", "rawtypes"}) - ParDoEvaluator<InputT> parDoEvaluator = - ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - fnLocal.get(), - application.getTransform().getSideInputs(), - mainOutputTag, - Collections.<TupleTag<?>>emptyList(), - ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); - return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnLocal); - } catch (Exception e) { - try { - fnLocal.remove(); - } catch (Exception removalException) { - LOG.error("Exception encountered constructing ParDo evaluator", removalException); - e.addSuppressed(removalException); - } - throw e; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java new file mode 100644 index 0000000..b554f41 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +/** Support for {@link ParDo.Bound} in {@link ParDoEvaluatorFactory}. */ +class ParDoSingleEvaluatorHooks<InputT, OutputT> + implements ParDoEvaluatorFactory.TransformHooks< + InputT, OutputT, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { + @Override + public DoFn<InputT, OutputT> getDoFn(ParDo.Bound<InputT, OutputT> transform) { + return transform.getNewFn(); + } + + @Override + public ParDoEvaluator<InputT, OutputT> createParDoEvaluator( + EvaluationContext evaluationContext, + AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> + application, + DirectStepContext stepContext, + DoFn<InputT, OutputT> fnLocal) { + TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); + ParDo.Bound<InputT, OutputT> transform = application.getTransform(); + return ParDoEvaluator.create( + evaluationContext, + stepContext, + application, + fnLocal, + transform.getSideInputs(), + mainOutputTag, + Collections.<TupleTag<?>>emptyList(), + ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 3dd44a7..f384a14 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -45,13 +45,17 @@ import org.slf4j.LoggerFactory; class TransformEvaluatorRegistry implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class); public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) { - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes"}) ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives = ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory(ctxt)) - .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory(ctxt)) + .put( + ParDo.Bound.class, + new ParDoEvaluatorFactory<>(ctxt, new ParDoSingleEvaluatorHooks<>())) + .put( + ParDo.BoundMulti.class, + new ParDoEvaluatorFactory<>(ctxt, new ParDoMultiEvaluatorHooks<>())) .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt)) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 89f9bfb..8254413 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -31,7 +31,6 @@ import java.util.Collection; import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.testing.TestPipeline; @@ -93,13 +92,11 @@ public class ParDoEvaluatorTest { RecorderFn fn = new RecorderFn(singletonView); PCollection<Integer> output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView)); - CommittedBundle<Integer> inputBundle = - bundleFactory.createBundle(inputPc).commit(Instant.now()); UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(output); when(evaluationContext.createBundle(output)).thenReturn(outputBundle); - ParDoEvaluator<Integer> evaluator = - createEvaluator(singletonView, fn, inputBundle, output); + ParDoEvaluator<Integer, Integer> evaluator = + createEvaluator(singletonView, fn, output); IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3); @@ -130,10 +127,9 @@ public class ParDoEvaluatorTest { WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L)))); } - private ParDoEvaluator<Integer> createEvaluator( + private ParDoEvaluator<Integer, Integer> createEvaluator( PCollectionView<Integer> singletonView, RecorderFn fn, - DirectRunner.CommittedBundle<Integer> inputBundle, PCollection<Integer> output) { when( evaluationContext.createSideInputReader( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java deleted file mode 100644 index cc83323..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.Serializable; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ParDoMultiEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class ParDoMultiEvaluatorFactoryTest implements Serializable { - private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); - - @Test - public void testParDoMultiInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory(evaluationContext) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder( - lengthOutputBundle, mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - lengthOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<Integer>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testParDoMultiUndeclaredSideOutput() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = - new DirectExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory(evaluationContext) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getAggregatorChanges(), equalTo(mutator)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); - final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String STATE_ID = "my-state-id"; - - @StateId(STATE_ID) - private final StateSpec<Object, BagState<String>> bagSpec = - StateSpecs.bag(StringUtf8Coder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { - bagState.add(c.element()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, - null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory(evaluationContext) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(20205L))); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - /** - * This test ignored, as today testing of GroupByKey is all the state that needs testing. - * This should be ported to state when ready. - */ - @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - private static final String EVENT_TIME_TIMER = "event-time-timer"; - private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; - - @TimerId(EVENT_TIME_TIMER) - TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(SYNC_PROC_TIME_TIMER) - TimerSpec syncProcTimerSpec = - TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - @ProcessElement - public void processElement( - ProcessContext c, - @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, - @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { - - eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); - syncProcTimeTimer.cancel(); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - EvaluationContext evaluationContext = mock(EvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); - - when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); - - DirectExecutionContext executionContext = new DirectExecutionContext(null, - StructuralKey.of("myKey", StringUtf8Coder.of()), - null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), - inputBundle.getKey())).thenReturn(executionContext); - AggregatorContainer container = AggregatorContainer.create(); - AggregatorContainer.Mutator mutator = container.createMutator(); - when(evaluationContext.getAggregatorContainer()).thenReturn(container); - when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); - - TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory(evaluationContext) - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - TransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) - .setTimer(addedTimer) - .setTimer(addedTimer) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .build())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/726998ae/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java new file mode 100644 index 0000000..6302d37 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooksTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.Serializable; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.TimerSpec; +import org.apache.beam.sdk.util.TimerSpecs; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ParDoMultiEvaluatorHooks}. + */ +@RunWith(JUnit4.class) +public class ParDoMultiEvaluatorHooksTest implements Serializable { + private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + + @Test + public void testParDoMultiInMemoryTransformEvaluator() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + final TupleTag<Integer> lengthTag = new TupleTag<>(); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); + + EvaluationContext evaluationContext = mock(EvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); + UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createBundle(lengthOutput); + + when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); + when(evaluationContext.createBundle(lengthOutput)).thenReturn(lengthOutputBundle); + + DirectExecutionContext executionContext = + new DirectExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); + AggregatorContainer container = AggregatorContainer.create(); + AggregatorContainer.Mutator mutator = container.createMutator(); + when(evaluationContext.getAggregatorContainer()).thenReturn(container); + when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + + TransformEvaluator<String> evaluator = + new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + TransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder( + lengthOutputBundle, mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getAggregatorChanges(), equalTo(mutator)); + + assertThat( + mainOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), + WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), + WindowedValue.valueInGlobalWindow( + KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + elementOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<String>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + lengthOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<Integer>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(3), + WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), + WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } + + @Test + public void testParDoMultiUndeclaredSideOutput() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + final TupleTag<Integer> lengthTag = new TupleTag<>(); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + EvaluationContext evaluationContext = mock(EvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); + + when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); + + DirectExecutionContext executionContext = + new DirectExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); + AggregatorContainer container = AggregatorContainer.create(); + AggregatorContainer.Mutator mutator = container.createMutator(); + when(evaluationContext.getAggregatorContainer()).thenReturn(container); + when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + + TransformEvaluator<String> evaluator = + new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + TransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getAggregatorChanges(), equalTo(mutator)); + + assertThat( + mainOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), + WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), + WindowedValue.valueInGlobalWindow( + KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + elementOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<String>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } + + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") + @Test + public void finishBundleWithStatePutsStateInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = + StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); + final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); + final StateNamespace windowNs = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + private static final String STATE_ID = "my-state-id"; + + @StateId(STATE_ID) + private final StateSpec<Object, BagState<String>> bagSpec = + StateSpecs.bag(StringUtf8Coder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(STATE_ID) BagState<String> bagState) { + bagState.add(c.element()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + EvaluationContext evaluationContext = mock(EvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); + + when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); + + DirectExecutionContext executionContext = new DirectExecutionContext(null, + StructuralKey.of("myKey", StringUtf8Coder.of()), + null, + null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); + AggregatorContainer container = AggregatorContainer.create(); + AggregatorContainer.Mutator mutator = container.createMutator(); + when(evaluationContext.getAggregatorContainer()).thenReturn(container); + when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + + TransformEvaluator<String> evaluator = + new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + TransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); + assertThat(result.getState(), not(nullValue())); + assertThat( + result.getState().state(StateNamespaces.global(), watermarkTag).read(), + equalTo(new Instant(20205L))); + assertThat( + result.getState().state(windowNs, bagTag).read(), + containsInAnyOrder("foo", "bara", "bazam")); + } + + /** + * This test ignored, as today testing of GroupByKey is all the state that needs testing. + * This should be ported to state when ready. + */ + @Ignore("State is not supported until BEAM-25. GroupByKey tests the needed functionality.") + @Test + public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + + final TimerData addedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME); + final TimerData deletedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + private static final String EVENT_TIME_TIMER = "event-time-timer"; + private static final String SYNC_PROC_TIME_TIMER = "sync-proc-time-timer"; + + @TimerId(EVENT_TIME_TIMER) + TimerSpec myTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(SYNC_PROC_TIME_TIMER) + TimerSpec syncProcTimerSpec = + TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext c, + @TimerId(EVENT_TIME_TIMER) Timer eventTimeTimer, + @TimerId(SYNC_PROC_TIME_TIMER) Timer syncProcTimeTimer) { + + eventTimeTimer.setForNowPlus(Duration.standardMinutes(5)); + syncProcTimeTimer.cancel(); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createBundle(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + EvaluationContext evaluationContext = mock(EvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createBundle(elementOutput); + + when(evaluationContext.createBundle(mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(elementOutput)).thenReturn(elementOutputBundle); + + DirectExecutionContext executionContext = new DirectExecutionContext(null, + StructuralKey.of("myKey", StringUtf8Coder.of()), + null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), + inputBundle.getKey())).thenReturn(executionContext); + AggregatorContainer container = AggregatorContainer.create(); + AggregatorContainer.Mutator mutator = container.createMutator(); + when(evaluationContext.getAggregatorContainer()).thenReturn(container); + when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + + TransformEvaluator<String> evaluator = + new ParDoEvaluatorFactory<>(evaluationContext, new ParDoMultiEvaluatorHooks<>()) + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + TransformResult result = evaluator.finishBundle(); + assertThat( + result.getTimerUpdate(), + equalTo( + TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of())) + .setTimer(addedTimer) + .setTimer(addedTimer) + .setTimer(addedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .deletedTimer(deletedTimer) + .build())); + } +}