http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java deleted file mode 100644 index fac5a40..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactory.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.TextIO.Write.Bound; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -class TextIOShardedWriteFactory implements PTransformOverrideFactory { - - @Override - public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( - PTransform<InputT, OutputT> transform) { - if (transform instanceof TextIO.Write.Bound) { - @SuppressWarnings("unchecked") - TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform; - if (originalWrite.getNumShards() > 1 - || (originalWrite.getNumShards() == 1 - && !"".equals(originalWrite.getShardNameTemplate()))) { - @SuppressWarnings("unchecked") - PTransform<InputT, OutputT> override = - (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite); - return override; - } - } - return transform; - } - - private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> { - private final TextIO.Write.Bound<InputT> initial; - - private TextIOShardedWrite(Bound<InputT> initial) { - this.initial = initial; - } - - @Override - int getNumShards() { - return initial.getNumShards(); - } - - @Override - PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) { - String shardName = - IOChannelUtils.constructName( - initial.getFilenamePrefix(), - initial.getShardTemplate(), - initial.getFilenameSuffix(), - shardNum, - getNumShards()); - return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding(); - } - - @Override - protected PTransform<PCollection<InputT>, PDone> delegate() { - return initial; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java deleted file mode 100644 index e002329..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * An evaluator of a specific application of a transform. Will be used for at least one - * {@link CommittedBundle}. - * - * @param <InputT> the type of elements that will be passed to {@link #processElement} - */ -public interface TransformEvaluator<InputT> { - /** - * Process an element in the input {@link CommittedBundle}. - * - * @param element the element to process - */ - void processElement(WindowedValue<InputT> element) throws Exception; - - /** - * Finish processing the bundle of this {@link TransformEvaluator}. - * - * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused, - * and no more elements will be processed. - * - * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation. - */ - InProcessTransformResult finishBundle() throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java deleted file mode 100644 index a9f6759..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; - -import javax.annotation.Nullable; - -/** - * A factory for creating instances of {@link TransformEvaluator} for the application of a - * {@link PTransform}. - */ -public interface TransformEvaluatorFactory { - /** - * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. - * - * Any work that must be done before input elements are processed (such as calling - * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is - * made available to the caller. - * - * @throws Exception whenever constructing the underlying evaluator throws an exception - */ - <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java deleted file mode 100644 index f6542b8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformEvaluatorRegistry.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; - -import com.google.common.collect.ImmutableMap; - -import java.util.Map; - -import javax.annotation.Nullable; - -/** - * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} - * implementations based on the type of {@link PTransform} of the application. - */ -class TransformEvaluatorRegistry implements TransformEvaluatorFactory { - public static TransformEvaluatorRegistry defaultRegistry() { - @SuppressWarnings("rawtypes") - ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives = - ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) - .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) - .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) - .put( - GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, - new GroupByKeyEvaluatorFactory()) - .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) - .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) - .put(Window.Bound.class, new WindowEvaluatorFactory()) - .build(); - return new TransformEvaluatorRegistry(primitives); - } - - // the TransformEvaluatorFactories can construct instances of all generic types of transform, - // so all instances of a primitive can be handled with the same evaluator factory. - @SuppressWarnings("rawtypes") - private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories; - - private TransformEvaluatorRegistry( - @SuppressWarnings("rawtypes") - Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) { - this.factories = factories; - } - - @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) - throws Exception { - TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); - return factory.forApplication(application, inputBundle, evaluationContext); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java deleted file mode 100644 index a93c7b2..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.WindowedValue; - -import com.google.common.base.Throwables; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.Nullable; - -/** - * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a - * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering - * the result using a registered {@link CompletionCallback}. - * - * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread - * that it is being executed on. - */ -class TransformExecutor<T> implements Callable<InProcessTransformResult> { - public static <T> TransformExecutor<T> create( - TransformEvaluatorFactory factory, - Iterable<? extends ModelEnforcementFactory> modelEnforcements, - InProcessEvaluationContext evaluationContext, - CommittedBundle<T> inputBundle, - AppliedPTransform<?, ?, ?> transform, - CompletionCallback completionCallback, - TransformExecutorService transformEvaluationState) { - return new TransformExecutor<>( - factory, - modelEnforcements, - evaluationContext, - inputBundle, - transform, - completionCallback, - transformEvaluationState); - } - - private final TransformEvaluatorFactory evaluatorFactory; - private final Iterable<? extends ModelEnforcementFactory> modelEnforcements; - - private final InProcessEvaluationContext evaluationContext; - - /** The transform that will be evaluated. */ - private final AppliedPTransform<?, ?, ?> transform; - /** The inputs this {@link TransformExecutor} will deliver to the transform. */ - private final CommittedBundle<T> inputBundle; - - private final CompletionCallback onComplete; - private final TransformExecutorService transformEvaluationState; - - private final AtomicReference<Thread> thread; - - private TransformExecutor( - TransformEvaluatorFactory factory, - Iterable<? extends ModelEnforcementFactory> modelEnforcements, - InProcessEvaluationContext evaluationContext, - CommittedBundle<T> inputBundle, - AppliedPTransform<?, ?, ?> transform, - CompletionCallback completionCallback, - TransformExecutorService transformEvaluationState) { - this.evaluatorFactory = factory; - this.modelEnforcements = modelEnforcements; - this.evaluationContext = evaluationContext; - - this.inputBundle = inputBundle; - this.transform = transform; - - this.onComplete = completionCallback; - - this.transformEvaluationState = transformEvaluationState; - this.thread = new AtomicReference<>(); - } - - @Override - public InProcessTransformResult call() { - checkState( - thread.compareAndSet(null, Thread.currentThread()), - "Tried to execute %s for %s on thread %s, but is already executing on thread %s", - TransformExecutor.class.getSimpleName(), - transform.getFullName(), - Thread.currentThread(), - thread.get()); - try { - Collection<ModelEnforcement<T>> enforcements = new ArrayList<>(); - for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { - ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform); - enforcements.add(enforcement); - } - TransformEvaluator<T> evaluator = - evaluatorFactory.forApplication(transform, inputBundle, evaluationContext); - - processElements(evaluator, enforcements); - - InProcessTransformResult result = finishBundle(evaluator, enforcements); - return result; - } catch (Throwable t) { - onComplete.handleThrowable(inputBundle, t); - throw Throwables.propagate(t); - } finally { - transformEvaluationState.complete(this); - } - } - - /** - * Processes all the elements in the input bundle using the transform evaluator, applying any - * necessary {@link ModelEnforcement ModelEnforcements}. - */ - private void processElements( - TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) - throws Exception { - if (inputBundle != null) { - for (WindowedValue<T> value : inputBundle.getElements()) { - for (ModelEnforcement<T> enforcement : enforcements) { - enforcement.beforeElement(value); - } - - evaluator.processElement(value); - - for (ModelEnforcement<T> enforcement : enforcements) { - enforcement.afterElement(value); - } - } - } - } - - /** - * Finishes processing the input bundle and commit the result using the - * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary. - * - * @return the {@link InProcessTransformResult} produced by - * {@link TransformEvaluator#finishBundle()} - */ - private InProcessTransformResult finishBundle( - TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) - throws Exception { - InProcessTransformResult result = evaluator.finishBundle(); - CommittedResult outputs = onComplete.handleResult(inputBundle, result); - for (ModelEnforcement<T> enforcement : enforcements) { - enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); - } - return result; - } - - /** - * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. - * Otherwise, return null. - */ - @Nullable - public Thread getThread() { - return thread.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java deleted file mode 100644 index 600102f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorService.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -/** - * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as - * appropriate for the {@link StepAndKey} the executor exists for. - */ -interface TransformExecutorService { - /** - * Schedule the provided work to be eventually executed. - */ - void schedule(TransformExecutor<?> work); - - /** - * Finish executing the provided work. This may cause additional - * {@link TransformExecutor TransformExecutors} to be evaluated. - */ - void complete(TransformExecutor<?> completed); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java deleted file mode 100644 index 3194340..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServices.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import com.google.common.base.MoreObjects; - -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Static factory methods for constructing instances of {@link TransformExecutorService}. - */ -final class TransformExecutorServices { - private TransformExecutorServices() { - // Do not instantiate - } - - /** - * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in - * parallel. - */ - public static TransformExecutorService parallel( - ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { - return new ParallelEvaluationState(executor, scheduled); - } - - /** - * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in - * serial. - */ - public static TransformExecutorService serial( - ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { - return new SerialEvaluationState(executor, scheduled); - } - - /** - * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor} - * scheduled will be immediately submitted to the {@link ExecutorService}. - * - * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are - * processed in parallel. - */ - private static class ParallelEvaluationState implements TransformExecutorService { - private final ExecutorService executor; - private final Map<TransformExecutor<?>, Boolean> scheduled; - - private ParallelEvaluationState( - ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { - this.executor = executor; - this.scheduled = scheduled; - } - - @Override - public void schedule(TransformExecutor<?> work) { - executor.submit(work); - scheduled.put(work, true); - } - - @Override - public void complete(TransformExecutor<?> completed) { - scheduled.remove(completed); - } - } - - /** - * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor} - * scheduled will be placed on the work queue. Only one item of work will be submitted to the - * {@link ExecutorService} at any time. - * - * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair. - * Keyed computations are processed serially per step. - */ - private static class SerialEvaluationState implements TransformExecutorService { - private final ExecutorService executor; - private final Map<TransformExecutor<?>, Boolean> scheduled; - - private AtomicReference<TransformExecutor<?>> currentlyEvaluating; - private final Queue<TransformExecutor<?>> workQueue; - - private SerialEvaluationState( - ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { - this.scheduled = scheduled; - this.executor = executor; - this.currentlyEvaluating = new AtomicReference<>(); - this.workQueue = new ConcurrentLinkedQueue<>(); - } - - /** - * Schedules the work, adding it to the work queue if there is a bundle currently being - * evaluated and scheduling it immediately otherwise. - */ - @Override - public void schedule(TransformExecutor<?> work) { - workQueue.offer(work); - updateCurrentlyEvaluating(); - } - - @Override - public void complete(TransformExecutor<?> completed) { - if (!currentlyEvaluating.compareAndSet(completed, null)) { - throw new IllegalStateException( - "Finished work " - + completed - + " but could not complete due to unexpected currently executing " - + currentlyEvaluating.get()); - } - scheduled.remove(completed); - updateCurrentlyEvaluating(); - } - - private void updateCurrentlyEvaluating() { - if (currentlyEvaluating.get() == null) { - // Only synchronize if we need to update what's currently evaluating - synchronized (this) { - TransformExecutor<?> newWork = workQueue.poll(); - if (newWork != null) { - if (currentlyEvaluating.compareAndSet(null, newWork)) { - scheduled.put(newWork, true); - executor.submit(newWork); - } else { - workQueue.offer(newWork); - } - } - } - } - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(SerialEvaluationState.class) - .add("currentlyEvaluating", currentlyEvaluating) - .add("workQueue", workQueue) - .toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java deleted file mode 100644 index 0cebf43..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.io.Read.Unbounded; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; -import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; - -import javax.annotation.Nullable; - -/** - * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} - * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. - */ -class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { - /* - * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted. - * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused - * and any splits are honored. - */ - private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>> - sourceEvaluators = new ConcurrentHashMap<>(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { - return getTransformEvaluator((AppliedPTransform) application, evaluationContext); - } - - private <OutputT> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) { - UnboundedReadEvaluator<?> currentEvaluator = - getTransformEvaluatorQueue(transform, evaluationContext).poll(); - if (currentEvaluator == null) { - return EmptyTransformEvaluator.create(transform); - } - return currentEvaluator; - } - - /** - * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the - * provided application of {@link Unbounded Read.Unbounded}, initializing it if required. - * - * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has - * already done so. - */ - @SuppressWarnings("unchecked") - private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( - final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, - final InProcessEvaluationContext evaluationContext) { - // Key by the application and the context the evaluation is occurring in (which call to - // Pipeline#run). - EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); - @SuppressWarnings("unchecked") - Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue = - (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); - if (evaluatorQueue == null) { - evaluatorQueue = new ConcurrentLinkedQueue<>(); - if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { - // If no queue existed in the evaluators, add an evaluator to initialize the evaluator - // factory for this transform - UnboundedSource<OutputT, ?> source = transform.getTransform().getSource(); - UnboundedReadEvaluator<OutputT> evaluator = - new UnboundedReadEvaluator<OutputT>( - transform, evaluationContext, source, evaluatorQueue); - evaluatorQueue.offer(evaluator); - } else { - // otherwise return the existing Queue that arrived before us - evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); - } - } - return evaluatorQueue; - } - - /** - * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource}, - * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator - * creates the {@link UnboundedReader} and consumes some currently available input. - * - * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be - * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own - * checkpoint, and constructs its reader from the current checkpoint in each call to - * {@link #finishBundle()}. - */ - private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { - private static final int ARBITRARY_MAX_ELEMENTS = 10; - private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform; - private final InProcessEvaluationContext evaluationContext; - private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue; - /** - * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same - * source as derived from {@link #transform} due to splitting. - */ - private final UnboundedSource<OutputT, ?> source; - private CheckpointMark checkpointMark; - - public UnboundedReadEvaluator( - AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, - InProcessEvaluationContext evaluationContext, - UnboundedSource<OutputT, ?> source, - Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) { - this.transform = transform; - this.evaluationContext = evaluationContext; - this.evaluatorQueue = evaluatorQueue; - this.source = source; - this.checkpointMark = null; - } - - @Override - public void processElement(WindowedValue<Object> element) {} - - @Override - public InProcessTransformResult finishBundle() throws IOException { - UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput()); - try (UnboundedReader<OutputT> reader = - createReader(source, evaluationContext.getPipelineOptions());) { - int numElements = 0; - if (reader.start()) { - do { - output.add( - WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp())); - numElements++; - } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); - } - checkpointMark = reader.getCheckpointMark(); - checkpointMark.finalizeCheckpoint(); - // TODO: When exercising create initial splits, make this the minimum watermark across all - // existing readers - StepTransformResult result = - StepTransformResult.withHold(transform, reader.getWatermark()) - .addOutput(output) - .build(); - evaluatorQueue.offer(this); - return result; - } - } - - private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader( - UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) { - @SuppressWarnings("unchecked") - CheckpointMarkT mark = (CheckpointMarkT) checkpointMark; - return source.createReader(options, mark); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java deleted file mode 100644 index 0b54ba8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactory.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import java.util.ArrayList; -import java.util.List; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the - * {@link CreatePCollectionView} primitive {@link PTransform}. - * - * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for - * the {@link WriteView} {@link PTransform}, which is part of the - * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the - * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is - * written. - */ -class ViewEvaluatorFactory implements TransformEvaluatorFactory { - @Override - public <T> TransformEvaluator<T> forApplication( - AppliedPTransform<?, ?, ?> application, - InProcessPipelineRunner.CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = createEvaluator( - (AppliedPTransform) application, evaluationContext); - return evaluator; - } - - private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator( - final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> - application, - InProcessEvaluationContext context) { - PCollection<Iterable<InT>> input = application.getInput(); - final PCollectionViewWriter<InT, OuT> writer = - context.createPCollectionViewWriter(input, application.getOutput()); - return new TransformEvaluator<Iterable<InT>>() { - private final List<WindowedValue<InT>> elements = new ArrayList<>(); - - @Override - public void processElement(WindowedValue<Iterable<InT>> element) { - for (InT input : element.getValue()) { - elements.add(element.withValue(input)); - } - } - - @Override - public InProcessTransformResult finishBundle() { - writer.add(elements); - return StepTransformResult.withoutHold(application).build(); - } - }; - } - - public static class InProcessViewOverrideFactory implements PTransformOverrideFactory { - @Override - public <InputT extends PInput, OutputT extends POutput> - PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) { - if (transform instanceof CreatePCollectionView) { - - } - @SuppressWarnings({"rawtypes", "unchecked"}) - PTransform<InputT, OutputT> createView = - (PTransform<InputT, OutputT>) - new InProcessCreatePCollectionView<>((CreatePCollectionView) transform); - return createView; - } - } - - /** - * An in-process override for {@link CreatePCollectionView}. - */ - private static class InProcessCreatePCollectionView<ElemT, ViewT> - extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> { - private final CreatePCollectionView<ElemT, ViewT> og; - - private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; - } - - @Override - public PCollectionView<ViewT> apply(PCollection<ElemT> input) { - return input.apply(WithKeys.<Void, ElemT>of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) - .apply(GroupByKey.<Void, ElemT>create()) - .apply(Values.<Iterable<ElemT>>create()) - .apply(new WriteView<ElemT, ViewT>(og)); - } - - @Override - protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() { - return og; - } - } - - /** - * An in-process implementation of the {@link CreatePCollectionView} primitive. - * - * This implementation requires the input {@link PCollection} to be an iterable, which is provided - * to {@link PCollectionView#fromIterableInternal(Iterable)}. - */ - public static final class WriteView<ElemT, ViewT> - extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { - private final CreatePCollectionView<ElemT, ViewT> og; - - WriteView(CreatePCollectionView<ElemT, ViewT> og) { - this.og = og; - } - - @Override - public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) { - return og.getView(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java deleted file mode 100644 index 3e4aca6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutor.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingStrategy; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Ordering; - -import org.joda.time.Instant; - -import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Executes callbacks that occur based on the progression of the watermark per-step. - * - * <p>Callbacks are registered by calls to - * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, - * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the - * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the - * windowing strategy would have been produced. - * - * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any - * {@link AppliedPTransform} - any call to - * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} - * that could have potentially already fired should be followed by a call to - * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current - * value of the watermark. - */ -class WatermarkCallbackExecutor { - /** - * Create a new {@link WatermarkCallbackExecutor}. - */ - public static WatermarkCallbackExecutor create() { - return new WatermarkCallbackExecutor(); - } - - private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> - callbacks; - private final ExecutorService executor; - - private WatermarkCallbackExecutor() { - this.callbacks = new ConcurrentHashMap<>(); - this.executor = Executors.newSingleThreadExecutor(); - } - - /** - * Execute the provided {@link Runnable} after the next call to - * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have - * produced output. - */ - public void callOnGuaranteedFiring( - AppliedPTransform<?, ?, ?> step, - BoundedWindow window, - WindowingStrategy<?, ?> windowingStrategy, - Runnable runnable) { - WatermarkCallback callback = - WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); - - PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step); - if (callbackQueue == null) { - callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); - if (callbacks.putIfAbsent(step, callbackQueue) != null) { - callbackQueue = callbacks.get(step); - } - } - - synchronized (callbackQueue) { - callbackQueue.offer(callback); - } - } - - /** - * Schedule all pending callbacks that must have produced output by the time of the provided - * watermark. - */ - public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) { - PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step); - if (callbackQueue == null) { - return; - } - synchronized (callbackQueue) { - while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { - executor.submit(callbackQueue.poll().getCallback()); - } - } - } - - private static class WatermarkCallback { - public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring( - BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) { - @SuppressWarnings("unchecked") - Instant firingAfter = - strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); - return new WatermarkCallback(firingAfter, callback); - } - - private final Instant fireAfter; - private final Runnable callback; - - private WatermarkCallback(Instant fireAfter, Runnable callback) { - this.fireAfter = fireAfter; - this.callback = callback; - } - - public boolean shouldFire(Instant currentWatermark) { - return currentWatermark.isAfter(fireAfter) - || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); - } - - public Runnable getCallback() { - return callback; - } - } - - private static class CallbackOrdering extends Ordering<WatermarkCallback> { - @Override - public int compare(WatermarkCallback left, WatermarkCallback right) { - return ComparisonChain.start() - .compare(left.fireAfter, right.fireAfter) - .compare(left.callback, right.callback, Ordering.arbitrary()) - .result(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java deleted file mode 100644 index 4cdacec..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactory.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Bound; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Instant; - -import java.util.Collection; - -import javax.annotation.Nullable; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the - * {@link Bound Window.Bound} primitive {@link PTransform}. - */ -class WindowEvaluatorFactory implements TransformEvaluatorFactory { - - @Override - public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) - throws Exception { - return createTransformEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); - } - - private <InputT> TransformEvaluator<InputT> createTransformEvaluator( - AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, - CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn(); - UncommittedBundle<InputT> outputBundle = - evaluationContext.createBundle(inputBundle, transform.getOutput()); - if (fn == null) { - return PassthroughTransformEvaluator.create(transform, outputBundle); - } - return new WindowIntoEvaluator<>(transform, fn, outputBundle); - } - - private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> { - private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> - transform; - private final WindowFn<InputT, ?> windowFn; - private final UncommittedBundle<InputT> outputBundle; - - @SuppressWarnings("unchecked") - public WindowIntoEvaluator( - AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, - WindowFn<? super InputT, ?> windowFn, - UncommittedBundle<InputT> outputBundle) { - this.outputBundle = outputBundle; - this.transform = transform; - // Safe contravariant cast - this.windowFn = (WindowFn<InputT, ?>) windowFn; - } - - @Override - public void processElement(WindowedValue<InputT> element) throws Exception { - Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element); - outputBundle.add( - WindowedValue.<InputT>of( - element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); - } - - private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows( - WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception { - WindowFn<InputT, W>.AssignContext assignContext = - new InProcessAssignContext<>(windowFn, element); - Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext); - return windows; - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); - } - } - - private static class InProcessAssignContext<InputT, W extends BoundedWindow> - extends WindowFn<InputT, W>.AssignContext { - private final WindowedValue<InputT> value; - - public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { - fn.super(); - this.value = value; - } - - @Override - public InputT element() { - return value.getValue(); - } - - @Override - public Instant timestamp() { - return value.getTimestamp(); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return value.getWindows(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java deleted file mode 100644 index 43367dd..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactoryTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.AvroIOTest; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; - -/** - * Tests for {@link AvroIOShardedWriteFactory}. - */ -@RunWith(JUnit4.class) -public class AvroIOShardedWriteFactoryTest { - - @Rule public TemporaryFolder tmp = new TemporaryFolder(); - private AvroIOShardedWriteFactory factory; - - @Before - public void setup() { - factory = new AvroIOShardedWriteFactory(); - } - - @Test - public void originalWithoutShardingReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform<PCollection<String>, PDone> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding(); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform<PCollection<String>, PDone> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardedToOneReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - AvroIO.Write.Bound<String> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); - - TestPipeline p = TestPipeline.create(); - String[] elems = new String[] {"foo", "bar", "baz"}; - p.apply(Create.<String>of(elems)).apply(overridden); - - file.delete(); - - p.run(); - AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate()); - } - - @Test - public void originalShardedToManyReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - AvroIO.Write.Bound<String> original = - AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); - - TestPipeline p = TestPipeline.create(); - String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; - p.apply(Create.<String>of(elems)).apply(overridden); - - file.delete(); - p.run(); - AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java deleted file mode 100644 index 146dd98..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Read.Bounded; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * Tests for {@link BoundedReadEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class BoundedReadEvaluatorFactoryTest { - private BoundedSource<Long> source; - private PCollection<Long> longs; - private TransformEvaluatorFactory factory; - @Mock private InProcessEvaluationContext context; - private BundleFactory bundleFactory; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - source = CountingSource.upTo(10L); - TestPipeline p = TestPipeline.create(); - longs = p.apply(Read.from(source)); - - factory = new BoundedReadEvaluatorFactory(); - bundleFactory = InProcessBundleFactory.create(); - } - - @Test - public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(output); - - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - InProcessTransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat( - output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), - containsInAnyOrder( - gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - } - - /** - * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for the same - * {@link Bounded Read.Bounded} application with the same evaluation context only produces the - * elements once. - */ - @Test - public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception { - UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(output); - - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - InProcessTransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - Iterable<? extends WindowedValue<Long>> outputElements = - output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(); - assertThat( - outputElements, - containsInAnyOrder( - gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(secondOutput); - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - InProcessTransformResult secondResult = secondEvaluator.finishBundle(); - assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); - assertThat(secondResult.getOutputBundles(), emptyIterable()); - assertThat( - secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable()); - assertThat( - outputElements, - containsInAnyOrder( - gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - } - - /** - * Demonstrates that acquiring multiple evaluators from the factory are independent, but - * the elements in the source are only produced once. - */ - @Test - public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception { - UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput); - - // create both evaluators before finishing either. - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - - InProcessTransformResult secondResult = secondEvaluator.finishBundle(); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - Iterable<? extends WindowedValue<Long>> outputElements = - output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(); - - assertThat( - outputElements, - containsInAnyOrder( - gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); - assertThat(secondResult.getOutputBundles(), emptyIterable()); - assertThat( - secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable()); - assertThat( - outputElements, - containsInAnyOrder( - gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - } - - @Test - public void boundedSourceEvaluatorClosesReader() throws Exception { - TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); - - TestPipeline p = TestPipeline.create(); - PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); - - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); - evaluator.finishBundle(); - CommittedBundle<Long> committed = output.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L))); - assertThat(TestSource.readerClosed, is(true)); - } - - @Test - public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { - TestSource<Long> source = new TestSource<>(BigEndianLongCoder.of()); - - TestPipeline p = TestPipeline.create(); - PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); - - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); - evaluator.finishBundle(); - CommittedBundle<Long> committed = output.commit(Instant.now()); - assertThat(committed.getElements(), emptyIterable()); - assertThat(TestSource.readerClosed, is(true)); - } - - private static class TestSource<T> extends BoundedSource<T> { - private static boolean readerClosed; - private final Coder<T> coder; - private final T[] elems; - - public TestSource(Coder<T> coder, T... elems) { - this.elems = elems; - this.coder = coder; - readerClosed = false; - } - - @Override - public List<? extends BoundedSource<T>> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - return ImmutableList.of(this); - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return 0; - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return false; - } - - @Override - public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException { - return new TestReader<>(this, elems); - } - - @Override - public void validate() { - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder; - } - } - - private static class TestReader<T> extends BoundedReader<T> { - private final BoundedSource<T> source; - private final List<T> elems; - private int index; - - public TestReader(BoundedSource<T> source, T... elems) { - this.source = source; - this.elems = Arrays.asList(elems); - this.index = -1; - } - - @Override - public BoundedSource<T> getCurrentSource() { - return source; - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (elems.size() > index + 1) { - index++; - return true; - } - return false; - } - - @Override - public T getCurrent() throws NoSuchElementException { - return elems.get(index); - } - - @Override - public void close() throws IOException { - TestSource.readerClosed = true; - } - } - - private static WindowedValue<Long> gw(Long elem) { - return WindowedValue.valueInGlobalWindow(elem); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java deleted file mode 100644 index c888a65..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.runners.inprocess; - -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import com.google.common.collect.ImmutableList; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; - -/** - * Tests for {@link CommittedResult}. - */ -@RunWith(JUnit4.class) -public class CommittedResultTest implements Serializable { - private transient TestPipeline p = TestPipeline.create(); - private transient AppliedPTransform<?, ?, ?> transform = - AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() { - }); - private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void getTransformExtractsFromResult() { - CommittedResult result = - CommittedResult.create(StepTransformResult.withoutHold(transform).build(), - Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList()); - - assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform)); - } - - @Test - public void getOutputsEqualInput() { - List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs = - ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, - WindowingStrategy.globalDefault(), - PCollection.IsBounded.BOUNDED)).commit(Instant.now()), - bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p, - WindowingStrategy.globalDefault(), - PCollection.IsBounded.UNBOUNDED)).commit(Instant.now())); - CommittedResult result = - CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs); - - assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java deleted file mode 100644 index aef4845..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.emptyIterable; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.PValue; - -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.List; - -/** - * Tests for {@link ConsumerTrackingPipelineVisitor}. - */ -@RunWith(JUnit4.class) -public class ConsumerTrackingPipelineVisitorTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - private transient TestPipeline p = TestPipeline.create(); - private transient ConsumerTrackingPipelineVisitor visitor = new ConsumerTrackingPipelineVisitor(); - - @Test - public void getViewsReturnsViews() { - PCollectionView<List<String>> listView = - p.apply("listCreate", Create.of("foo", "bar")) - .apply( - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })) - .apply(View.<String>asList()); - PCollectionView<Object> singletonView = - p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton()); - p.traverseTopologically(visitor); - assertThat( - visitor.getViews(), - Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView)); - } - - @Test - public void getRootTransformsContainsPBegins() { - PCollection<String> created = p.apply(Create.of("foo", "bar")); - PCollection<Long> counted = p.apply(CountingInput.upTo(1234L)); - PCollection<Long> unCounted = p.apply(CountingInput.unbounded()); - p.traverseTopologically(visitor); - assertThat( - visitor.getRootTransforms(), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - created.getProducingTransformInternal(), - counted.getProducingTransformInternal(), - unCounted.getProducingTransformInternal())); - } - - @Test - public void getRootTransformsContainsEmptyFlatten() { - PCollection<String> empty = - PCollectionList.<String>empty(p).apply(Flatten.<String>pCollections()); - p.traverseTopologically(visitor); - assertThat( - visitor.getRootTransforms(), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - empty.getProducingTransformInternal())); - } - - @Test - public void getValueToConsumersSucceeds() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - - PCollection<String> flattened = - PCollectionList.of(created).and(transformed).apply(Flatten.<String>pCollections()); - - p.traverseTopologically(visitor); - - assertThat( - visitor.getValueToConsumers().get(created), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - transformed.getProducingTransformInternal(), - flattened.getProducingTransformInternal())); - assertThat( - visitor.getValueToConsumers().get(transformed), - Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( - flattened.getProducingTransformInternal())); - assertThat(visitor.getValueToConsumers().get(flattened), emptyIterable()); - } - - @Test - public void getUnfinalizedPValuesContainsDanglingOutputs() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - - p.traverseTopologically(visitor); - assertThat(visitor.getUnfinalizedPValues(), Matchers.<PValue>contains(transformed)); - } - - @Test - public void getUnfinalizedPValuesEmpty() { - p.apply(Create.of("1", "2", "3")) - .apply( - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })) - .apply( - new PTransform<PInput, PDone>() { - @Override - public PDone apply(PInput input) { - return PDone.in(input.getPipeline()); - } - }); - - p.traverseTopologically(visitor); - assertThat(visitor.getUnfinalizedPValues(), emptyIterable()); - } - - @Test - public void getStepNamesContainsAllTransforms() { - PCollection<String> created = p.apply(Create.of("1", "2", "3")); - PCollection<String> transformed = - created.apply( - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(DoFn<String, String>.ProcessContext c) - throws Exception { - c.output(Integer.toString(c.element().length())); - } - })); - PDone finished = - transformed.apply( - new PTransform<PInput, PDone>() { - @Override - public PDone apply(PInput input) { - return PDone.in(input.getPipeline()); - } - }); - - p.traverseTopologically(visitor); - assertThat( - visitor.getStepNames(), - Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( - created.getProducingTransformInternal(), "s0")); - assertThat( - visitor.getStepNames(), - Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( - transformed.getProducingTransformInternal(), "s1")); - assertThat( - visitor.getStepNames(), - Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( - finished.getProducingTransformInternal(), "s2")); - } - - @Test - public void traverseMultipleTimesThrows() { - p.apply(Create.of(1, 2, 3)); - - p.traverseTopologically(visitor); - thrown.expect(IllegalStateException.class); - thrown.expectMessage(ConsumerTrackingPipelineVisitor.class.getSimpleName()); - thrown.expectMessage("is finalized"); - p.traverseTopologically(visitor); - } - - @Test - public void traverseIndependentPathsSucceeds() { - p.apply("left", Create.of(1, 2, 3)); - p.apply("right", Create.of("foo", "bar", "baz")); - - p.traverseTopologically(visitor); - } - - @Test - public void getRootTransformsWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getRootTransforms"); - visitor.getRootTransforms(); - } - @Test - public void getStepNamesWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getStepNames"); - visitor.getStepNames(); - } - @Test - public void getUnfinalizedPValuesWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getUnfinalizedPValues"); - visitor.getUnfinalizedPValues(); - } - - @Test - public void getValueToConsumersWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getValueToConsumers"); - visitor.getValueToConsumers(); - } - - @Test - public void getViewsWithoutVisitingThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getViews"); - visitor.getViews(); - } -}
