http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java deleted file mode 100644 index bdc525a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineOptions.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.Hidden; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.Validation.Required; -import org.apache.beam.sdk.transforms.PTransform; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * Options that can be used to configure the {@link InProcessPipelineRunner}. - */ -public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions { - /** - * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService} - * to execute {@link PTransform PTransforms}. - * - * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that - * it cannot enter a state in which it will not schedule additional pending work unless currently - * scheduled work completes, as this may cause the {@link Pipeline} to cease processing. - * - * <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of - * {@link Executors#newCachedThreadPool()}. - */ - @JsonIgnore - @Required - @Hidden - @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) - ExecutorServiceFactory getExecutorServiceFactory(); - - void setExecutorServiceFactory(ExecutorServiceFactory executorService); - - /** - * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the - * system time when time values are required by the evaluator. - */ - @Default.InstanceFactory(NanosOffsetClock.Factory.class) - @JsonIgnore - @Required - @Hidden - @Description( - "The processing time source used by the pipeline. When the current time is " - + "needed by the evaluator, the result of clock#now() is used.") - Clock getClock(); - - void setClock(Clock clock); - - @Default.Boolean(false) - @Description( - "If the pipeline should shut down producers which have reached the maximum " - + "representable watermark. If this is set to true, a pipeline in which all PTransforms " - + "have reached the maximum watermark will be shut down, even if there are unbounded " - + "sources that could produce additional (late) data. By default, if the pipeline " - + "contains any unbounded PCollections, it will run until explicitly shut down.") - boolean isShutdownUnboundedProducersWithMaxWatermark(); - - void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown); - - @Default.Boolean(true) - @Description( - "If the pipeline should block awaiting completion of the pipeline. If set to true, " - + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, " - + "the Pipeline will execute asynchronously. If set to false, the completion of the " - + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().") - boolean isBlockOnRun(); - - void setBlockOnRun(boolean b); - - @Default.Boolean(true) - @Description( - "Controls whether the runner should ensure that all of the elements of every " - + "PCollection are not mutated. PTransforms are not permitted to mutate input elements " - + "at any point, or output elements after they are output.") - boolean isTestImmutability(); - - void setTestImmutability(boolean test); -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java deleted file mode 100644 index 7897f2e..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunner.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; -import org.apache.beam.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory; -import org.apache.beam.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessViewOverrideFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.util.MapAggregatorValues; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.joda.time.Instant; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -import javax.annotation.Nullable; - -/** - * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded - * {@link PCollection PCollections}. - */ -@Experimental -public class InProcessPipelineRunner - extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> { - /** - * The default set of transform overrides to use in the {@link InProcessPipelineRunner}. - * - * <p>A transform override must have a single-argument constructor that takes an instance of the - * type of transform it is overriding. - */ - @SuppressWarnings("rawtypes") - private static Map<Class<? extends PTransform>, PTransformOverrideFactory> - defaultTransformOverrides = - ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder() - .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory()) - .put(CreatePCollectionView.class, new InProcessViewOverrideFactory()) - .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory()) - .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory()) - .build(); - - /** - * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be - * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is - * a part of at a later point. This is an uncommitted bundle and can have elements added to it. - * - * @param <T> the type of elements that can be added to this bundle - */ - public static interface UncommittedBundle<T> { - /** - * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to. - */ - PCollection<T> getPCollection(); - - /** - * Outputs an element to this bundle. - * - * @param element the element to add to this bundle - * @return this bundle - */ - UncommittedBundle<T> add(WindowedValue<T> element); - - /** - * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle} - * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method - * will throw an {@link IllegalStateException} if called after a call to commit. - * @param synchronizedProcessingTime the synchronized processing time at which this bundle was - * committed - */ - CommittedBundle<T> commit(Instant synchronizedProcessingTime); - } - - /** - * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will - * eventually committed. Committed elements are executed by the {@link PTransform PTransforms} - * that consume the {@link PCollection} this bundle is - * a part of at a later point. - * @param <T> the type of elements contained within this bundle - */ - public static interface CommittedBundle<T> { - /** - * Returns the PCollection that the elements of this bundle belong to. - */ - PCollection<T> getPCollection(); - - /** - * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the - * execution of this bundle. - */ - @Nullable - Object getKey(); - - /** - * Returns an {@link Iterable} containing all of the elements that have been added to this - * {@link CommittedBundle}. - */ - Iterable<WindowedValue<T>> getElements(); - - /** - * Returns the processing time output watermark at the time the producing {@link PTransform} - * committed this bundle. Downstream synchronized processing time watermarks cannot progress - * past this point before consuming this bundle. - * - * <p>This value is no greater than the earliest incomplete processing time or synchronized - * processing time {@link TimerData timer} at the time this bundle was committed, including any - * timers that fired to produce this bundle. - */ - Instant getSynchronizedProcessingOutputWatermark(); - - /** - * Return a new {@link CommittedBundle} that is like this one, except calls to - * {@link #getElements()} will return the provided elements. This bundle is unchanged. - * - * <p> - * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing - * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from - * the current bundle. This is used to ensure a {@link PTransform} that could not complete - * processing on input elements properly holds the synchronized processing time to the - * appropriate value. - */ - CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements); - } - - /** - * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to - * a storage mechanism that can be read from while constructing a {@link PCollectionView}. - * @param <ElemT> the type of elements the input {@link PCollection} contains. - * @param <ViewT> the type of the PCollectionView this writer writes to. - */ - public static interface PCollectionViewWriter<ElemT, ViewT> { - void add(Iterable<WindowedValue<ElemT>> values); - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - private final InProcessPipelineOptions options; - - public static InProcessPipelineRunner fromOptions(PipelineOptions options) { - return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class)); - } - - private InProcessPipelineRunner(InProcessPipelineOptions options) { - this.options = options; - } - - /** - * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}. - */ - public InProcessPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass()); - if (overrideFactory != null) { - PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform); - - return super.apply(customTransform, input); - } - // If there is no override, or we should not apply the override, apply the original transform - return super.apply(transform, input); - } - - @Override - public InProcessPipelineResult run(Pipeline pipeline) { - ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor(); - pipeline.traverseTopologically(consumerTrackingVisitor); - for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) { - unfinalized.finishSpecifying(); - } - @SuppressWarnings("rawtypes") - KeyedPValueTrackingVisitor keyedPValueVisitor = - KeyedPValueTrackingVisitor.create( - ImmutableSet.<Class<? extends PTransform>>of( - GroupByKey.class, InProcessGroupByKeyOnly.class)); - pipeline.traverseTopologically(keyedPValueVisitor); - - InProcessEvaluationContext context = - InProcessEvaluationContext.create( - getPipelineOptions(), - createBundleFactory(getPipelineOptions()), - consumerTrackingVisitor.getRootTransforms(), - consumerTrackingVisitor.getValueToConsumers(), - consumerTrackingVisitor.getStepNames(), - consumerTrackingVisitor.getViews()); - - // independent executor service for each run - ExecutorService executorService = - context.getPipelineOptions().getExecutorServiceFactory().create(); - InProcessExecutor executor = - ExecutorServiceParallelExecutor.create( - executorService, - consumerTrackingVisitor.getValueToConsumers(), - keyedPValueVisitor.getKeyedPValues(), - TransformEvaluatorRegistry.defaultRegistry(), - defaultModelEnforcements(options), - context); - executor.start(consumerTrackingVisitor.getRootTransforms()); - - Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = - new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); - InProcessPipelineResult result = - new InProcessPipelineResult(executor, context, aggregatorSteps); - if (options.isBlockOnRun()) { - try { - result.awaitCompletion(); - } catch (UserCodeException userException) { - throw new PipelineExecutionException(userException.getCause()); - } catch (Throwable t) { - Throwables.propagate(t); - } - } - return result; - } - - private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - defaultModelEnforcements(InProcessPipelineOptions options) { - ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> - enforcements = ImmutableMap.builder(); - Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options); - enforcements.put(ParDo.Bound.class, parDoEnforcements); - enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); - return enforcements.build(); - } - - private Collection<ModelEnforcementFactory> createParDoEnforcements( - InProcessPipelineOptions options) { - ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder(); - if (options.isTestImmutability()) { - enforcements.add(ImmutabilityEnforcementFactory.create()); - } - return enforcements.build(); - } - - private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { - BundleFactory bundleFactory = InProcessBundleFactory.create(); - if (pipelineOptions.isTestImmutability()) { - bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); - } - return bundleFactory; - } - - /** - * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. - * - * Throws {@link UnsupportedOperationException} for all methods. - */ - public static class InProcessPipelineResult implements PipelineResult { - private final InProcessExecutor executor; - private final InProcessEvaluationContext evaluationContext; - private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps; - private State state; - - private InProcessPipelineResult( - InProcessExecutor executor, - InProcessEvaluationContext evaluationContext, - Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) { - this.executor = executor; - this.evaluationContext = evaluationContext; - this.aggregatorSteps = aggregatorSteps; - // Only ever constructed after the executor has started. - this.state = State.RUNNING; - } - - @Override - public State getState() { - return state; - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - CounterSet counters = evaluationContext.getCounters(); - Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator); - Map<String, T> stepValues = new HashMap<>(); - for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) { - if (steps.contains(transform.getTransform())) { - String stepName = - String.format( - "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName()); - Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName); - if (counter != null) { - stepValues.put(transform.getFullName(), counter.getAggregate()); - } - } - } - return new MapAggregatorValues<>(stepValues); - } - - /** - * Blocks until the {@link Pipeline} execution represented by this - * {@link InProcessPipelineResult} is complete, returning the terminal state. - * - * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the - * exception. Future calls to {@link #getState()} will return - * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}. - * - * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded} - * {@link PCollection}, and the {@link PipelineRunner} was created with - * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false, - * this method will never return. - * - * See also {@link InProcessExecutor#awaitCompletion()}. - */ - public State awaitCompletion() throws Throwable { - if (!state.isTerminal()) { - try { - executor.awaitCompletion(); - state = State.DONE; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } catch (Throwable t) { - state = State.FAILED; - throw t; - } - } - return state; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java deleted file mode 100644 index 8d29d01..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessRegistrar.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -/** - * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the - * {@link InProcessPipelineRunner}. - */ -public class InProcessRegistrar { - private InProcessRegistrar() {} - /** - * Registers the {@link InProcessPipelineRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class InProcessRunner implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of(InProcessPipelineRunner.class); - } - } - - /** - * Registers the {@link InProcessPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class InProcessOptions implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(InProcessPipelineOptions.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java deleted file mode 100644 index fda78fc..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.PCollectionViewWindow; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Throwables; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.SettableFuture; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import javax.annotation.Nullable; - -/** - * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for - * constructing {@link SideInputReader SideInputReaders} which block until a side input is - * available and writing to a {@link PCollectionView}. - */ -class InProcessSideInputContainer { - private final InProcessEvaluationContext evaluationContext; - private final Collection<PCollectionView<?>> containedViews; - private final LoadingCache<PCollectionViewWindow<?>, - SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows; - - /** - * Create a new {@link InProcessSideInputContainer} with the provided views and the provided - * context. - */ - public static InProcessSideInputContainer create( - InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) { - CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - loader = new CacheLoader<PCollectionViewWindow<?>, - SettableFuture<Iterable<? extends WindowedValue<?>>>>() { - @Override - public SettableFuture<Iterable<? extends WindowedValue<?>>> load( - PCollectionViewWindow<?> view) { - return SettableFuture.create(); - } - }; - LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - viewByWindows = CacheBuilder.newBuilder().build(loader); - return new InProcessSideInputContainer(context, containedViews, viewByWindows); - } - - private InProcessSideInputContainer(InProcessEvaluationContext context, - Collection<PCollectionView<?>> containedViews, - LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>> - viewByWindows) { - this.evaluationContext = context; - this.containedViews = ImmutableSet.copyOf(containedViews); - this.viewByWindows = viewByWindows; - } - - /** - * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the - * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without - * casting, but will change as this {@link InProcessSideInputContainer} is modified. - */ - public ReadyCheckingSideInputReader createReaderForViews( - Collection<PCollectionView<?>> newContainedViews) { - if (!containedViews.containsAll(newContainedViews)) { - Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews); - Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews); - throw new IllegalArgumentException("Can't create a SideInputReader with unknown views " - + Sets.difference(newRequested, currentlyContained)); - } - return new SideInputContainerSideInputReader(newContainedViews); - } - - /** - * Write the provided values to the provided view. - * - * <p>The windowed values are first exploded, then for each window the pane is determined. For - * each window, if the pane is later than the current pane stored within this container, write - * all of the values to the container as the new values of the {@link PCollectionView}. - * - * <p>The provided iterable is expected to contain only a single window and pane. - */ - public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) { - Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = - indexValuesByWindow(values); - for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues : - valuesPerWindow.entrySet()) { - updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); - } - } - - /** - * Index the provided values by all {@link BoundedWindow windows} in which they appear. - */ - private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow( - Iterable<? extends WindowedValue<?>> values) { - Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>(); - for (WindowedValue<?> value : values) { - for (BoundedWindow window : value.getWindows()) { - Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window); - if (windowValues == null) { - windowValues = new ArrayList<>(); - valuesPerWindow.put(window, windowValues); - } - windowValues.add(value); - } - } - return valuesPerWindow; - } - - /** - * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the - * specified values, if the values are part of a later pane than currently exist within the - * {@link PCollectionViewWindow}. - */ - private void updatePCollectionViewWindowValues( - PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) { - PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window); - SettableFuture<Iterable<? extends WindowedValue<?>>> future = null; - try { - future = viewByWindows.get(windowedView); - if (future.isDone()) { - Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator(); - PaneInfo newPane = windowValues.iterator().next().getPane(); - // The current value may have no elements, if no elements were produced for the window, - // but we are recieving late data. - if (!existingValues.hasNext() - || newPane.getIndex() > existingValues.next().getPane().getIndex()) { - viewByWindows.invalidate(windowedView); - viewByWindows.get(windowedView).set(windowValues); - } - } else { - future.set(windowValues); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (future != null && !future.isDone()) { - future.set(Collections.<WindowedValue<?>>emptyList()); - } - } catch (ExecutionException e) { - Throwables.propagate(e.getCause()); - } - } - - private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { - private final Collection<PCollectionView<?>> readerViews; - - private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) { - this.readerViews = ImmutableSet.copyOf(readerViews); - } - - @Override - public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) { - checkArgument( - readerViews.contains(view), - "Tried to check if view %s was ready in a SideInputReader that does not contain it. " - + "Contained views; %s", - view, - readerViews); - return getViewFuture(view, window).isDone(); - } - - @Override - @Nullable - public <T> T get(final PCollectionView<T> view, final BoundedWindow window) { - checkArgument( - readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); - try { - final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window); - // Safe covariant cast - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get(); - return view.fromIterableInternal(values); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - /** - * Gets the future containing the contents of the provided {@link PCollectionView} in the - * provided {@link BoundedWindow}, setting up a callback to populate the future with empty - * contents if necessary. - */ - private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture( - final PCollectionView<T> view, final BoundedWindow window) { - PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window); - final SettableFuture<Iterable<? extends WindowedValue<?>>> future = - viewByWindows.getUnchecked(windowedView); - - WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.scheduleAfterOutputWouldBeProduced( - view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); - return future; - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - return readerViews.contains(view); - } - - @Override - public boolean isEmpty() { - return readerViews.isEmpty(); - } - } - - private static class WriteEmptyViewContents implements Runnable { - private final PCollectionView<?> view; - private final BoundedWindow window; - private final SettableFuture<Iterable<? extends WindowedValue<?>>> future; - - private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window, - SettableFuture<Iterable<? extends WindowedValue<?>>> future) { - this.future = future; - this.view = view; - this.window = window; - } - - @Override - public void run() { - // The requested window has closed without producing elements, so reflect that in - // the PCollectionView. If set has already been called, will do nothing. - future.set(Collections.<WindowedValue<?>>emptyList()); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("view", view) - .add("window", window) - .toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java deleted file mode 100644 index 3422efd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternals.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.TimerInternals; - -import org.joda.time.Instant; - -import javax.annotation.Nullable; - -/** - * An implementation of {@link TimerInternals} where all relevant data exists in memory. - */ -public class InProcessTimerInternals implements TimerInternals { - private final Clock processingTimeClock; - private final TransformWatermarks watermarks; - private final TimerUpdateBuilder timerUpdateBuilder; - - public static InProcessTimerInternals create( - Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { - return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder); - } - - private InProcessTimerInternals( - Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { - this.processingTimeClock = clock; - this.watermarks = watermarks; - this.timerUpdateBuilder = timerUpdateBuilder; - } - - @Override - public void setTimer(TimerData timerKey) { - timerUpdateBuilder.setTimer(timerKey); - } - - @Override - public void deleteTimer(TimerData timerKey) { - timerUpdateBuilder.deletedTimer(timerKey); - } - - public TimerUpdate getTimerUpdate() { - return timerUpdateBuilder.build(); - } - - @Override - public Instant currentProcessingTime() { - return processingTimeClock.now(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return watermarks.getSynchronizedProcessingInputTime(); - } - - @Override - public Instant currentInputWatermarkTime() { - return watermarks.getInputWatermark(); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return watermarks.getOutputWatermark(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java deleted file mode 100644 index ed77f70..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessTransformResult.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; - -import org.joda.time.Instant; - -import javax.annotation.Nullable; - -/** - * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}. - */ -public interface InProcessTransformResult { - /** - * Returns the {@link AppliedPTransform} that produced this result. - */ - AppliedPTransform<?, ?, ?> getTransform(); - - /** - * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These - * will be committed by the evaluation context as part of completing this result. - */ - Iterable<? extends UncommittedBundle<?>> getOutputBundles(); - - /** - * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did - * not use a {@link CounterSet}. - */ - @Nullable CounterSet getCounters(); - - /** - * Returns the Watermark Hold for the transform at the time this result was produced. - * - * If the transform does not set any watermark hold, returns - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - */ - Instant getWatermarkHold(); - - /** - * Returns the State used by the transform. - * - * If this evaluation did not access state, this may return null. - */ - @Nullable - CopyOnAccessInMemoryStateInternals<?> getState(); - - /** - * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the - * evaluation was triggered due to the delivery of one or more timers, those timers must be added - * to the builder before it is complete. - * - * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate. - */ - TimerUpdate getTimerUpdate(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java deleted file mode 100644 index 0e6b7e8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; - -import java.util.HashSet; -import java.util.Set; - -/** - * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it - * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that - * produces keyed outputs is assumed to colocate output elements that share a key. - * - * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce - * keyed output. - */ -// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms -// unkeyed -class KeyedPValueTrackingVisitor implements PipelineVisitor { - @SuppressWarnings("rawtypes") - private final Set<Class<? extends PTransform>> producesKeyedOutputs; - private final Set<PValue> keyedValues; - private boolean finalized; - - public static KeyedPValueTrackingVisitor create( - @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) { - return new KeyedPValueTrackingVisitor(producesKeyedOutputs); - } - - private KeyedPValueTrackingVisitor( - @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) { - this.producesKeyedOutputs = producesKeyedOutputs; - this.keyedValues = new HashSet<>(); - } - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - checkState( - !finalized, - "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)", - KeyedPValueTrackingVisitor.class.getSimpleName(), - node); - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - checkState( - !finalized, - "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)", - KeyedPValueTrackingVisitor.class.getSimpleName(), - node); - if (node.isRootNode()) { - finalized = true; - } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { - keyedValues.addAll(node.getExpandedOutputs()); - } - } - - @Override - public void visitTransform(TransformTreeNode node) {} - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - if (producesKeyedOutputs.contains(producer.getTransform().getClass())) { - keyedValues.addAll(value.expand()); - } - } - - public Set<PValue> getKeyedPValues() { - checkState( - finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed"); - return keyedValues; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java deleted file mode 100644 index 4a3d17a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcement.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -/** - * Enforcement tools that verify that executing code conforms to the model. - * - * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The - * {@link ModelEnforcement} is provided with the input bundle as part of - * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element - * before and after that element is provided to an underlying {@link TransformEvaluator}, and the - * output {@link InProcessTransformResult} and committed output bundles after the - * {@link TransformEvaluator} has completed. - * - * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder} - * of the input {@link PCollection} on construction, and then enforce per-element behavior - * (such as the immutability of input elements). When the element is output or the bundle is - * completed, the required conditions can be enforced across all elements. - */ -public interface ModelEnforcement<T> { - /** - * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the - * provided {@link WindowedValue}. - */ - void beforeElement(WindowedValue<T> element); - - /** - * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the - * provided {@link WindowedValue}. - */ - void afterElement(WindowedValue<T> element); - - /** - * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been - * called, producing the provided {@link InProcessTransformResult} and - * {@link CommittedBundle output bundles}. - */ - void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java deleted file mode 100644 index 1fa36d6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ModelEnforcementFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; - -/** - * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input - * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the - * {@link TransformEvaluator} is created. - */ -public interface ModelEnforcementFactory { - <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java deleted file mode 100644 index 71039fa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/NanosOffsetClock.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; - -import java.util.concurrent.TimeUnit; - -/** - * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. - */ -public class NanosOffsetClock implements Clock { - private final long baseMillis; - private final long nanosAtBaseMillis; - - public static NanosOffsetClock create() { - return new NanosOffsetClock(); - } - - private NanosOffsetClock() { - baseMillis = System.currentTimeMillis(); - nanosAtBaseMillis = System.nanoTime(); - } - - @Override - public Instant now() { - return new Instant( - baseMillis + (TimeUnit.MILLISECONDS.convert( - System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS))); - } - - /** - * Creates instances of {@link NanosOffsetClock}. - */ - public static class Factory implements DefaultValueFactory<Clock> { - @Override - public Clock create(PipelineOptions options) { - return new NanosOffsetClock(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java deleted file mode 100644 index 2b4bf09..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PTransformOverrideFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -interface PTransformOverrideFactory { - /** - * Create a {@link PTransform} override for the provided {@link PTransform} if applicable. - * Otherwise, return the input {@link PTransform}. - * - * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}. - */ - <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( - PTransform<InputT, OutputT> transform); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java deleted file mode 100644 index 35639bd..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.DoFnRunners.OutputManager; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { - public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create( - InProcessEvaluationContext evaluationContext, - CommittedBundle<InputT> inputBundle, - AppliedPTransform<PCollection<InputT>, ?, ?> application, - DoFn<InputT, OutputT> fn, - List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - Map<TupleTag<?>, PCollection<?>> outputs) { - InProcessExecutionContext executionContext = - evaluationContext.getExecutionContext(application, inputBundle.getKey()); - String stepName = evaluationContext.getStepName(application); - InProcessStepContext stepContext = - executionContext.getOrCreateStepContext(stepName, stepName); - - CounterSet counters = evaluationContext.createCounterSet(); - - Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); - for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { - outputBundles.put( - outputEntry.getKey(), - evaluationContext.createBundle(inputBundle, outputEntry.getValue())); - } - - DoFnRunner<InputT, OutputT> runner = - DoFnRunners.createDefault( - evaluationContext.getPipelineOptions(), - SerializableUtils.clone(fn), - evaluationContext.createSideInputReader(sideInputs), - BundleOutputManager.create(outputBundles), - mainOutputTag, - sideOutputTags, - stepContext, - counters.getAddCounterMutator(), - application.getInput().getWindowingStrategy()); - - try { - runner.startBundle(); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - - return new ParDoInProcessEvaluator<>( - runner, application, counters, outputBundles.values(), stepContext); - } - - //////////////////////////////////////////////////////////////////////////////////////////////// - - private final DoFnRunner<T, ?> fnRunner; - private final AppliedPTransform<PCollection<T>, ?, ?> transform; - private final CounterSet counters; - private final Collection<UncommittedBundle<?>> outputBundles; - private final InProcessStepContext stepContext; - - private ParDoInProcessEvaluator( - DoFnRunner<T, ?> fnRunner, - AppliedPTransform<PCollection<T>, ?, ?> transform, - CounterSet counters, - Collection<UncommittedBundle<?>> outputBundles, - InProcessStepContext stepContext) { - this.fnRunner = fnRunner; - this.transform = transform; - this.counters = counters; - this.outputBundles = outputBundles; - this.stepContext = stepContext; - } - - @Override - public void processElement(WindowedValue<T> element) { - try { - fnRunner.processElement(element); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - @Override - public InProcessTransformResult finishBundle() { - try { - fnRunner.finishBundle(); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - StepTransformResult.Builder resultBuilder; - CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); - if (state != null) { - resultBuilder = - StepTransformResult.withHold(transform, state.getEarliestWatermarkHold()) - .withState(state); - } else { - resultBuilder = StepTransformResult.withoutHold(transform); - } - return resultBuilder - .addOutput(outputBundles) - .withTimerUpdate(stepContext.getTimerUpdate()) - .withCounters(counters) - .build(); - } - - static class BundleOutputManager implements OutputManager { - private final Map<TupleTag<?>, UncommittedBundle<?>> bundles; - private final Map<TupleTag<?>, List<?>> undeclaredOutputs; - - public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) { - return new BundleOutputManager(outputBundles); - } - - private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) { - this.bundles = bundles; - undeclaredOutputs = new HashMap<>(); - } - - @SuppressWarnings("unchecked") - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - @SuppressWarnings("rawtypes") - UncommittedBundle bundle = bundles.get(tag); - if (bundle == null) { - List undeclaredContents = undeclaredOutputs.get(tag); - if (undeclaredContents == null) { - undeclaredContents = new ArrayList<T>(); - undeclaredOutputs.put(tag, undeclaredContents); - } - undeclaredContents.add(output); - } else { - bundle.add(output); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java deleted file mode 100644 index 299d3a8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; - -import java.util.Map; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the - * {@link BoundMulti} primitive {@link PTransform}. - */ -class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { - @Override - public <T> TransformEvaluator<T> forApplication( - AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = - createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); - return evaluator; - } - - private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator( - AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application, - CommittedBundle<InT> inputBundle, - InProcessEvaluationContext evaluationContext) { - Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); - DoFn<InT, OuT> fn = application.getTransform().getFn(); - - return ParDoInProcessEvaluator.create( - evaluationContext, - inputBundle, - application, - fn, - application.getTransform().getSideInputs(), - application.getTransform().getMainOutputTag(), - application.getTransform().getSideOutputTags().getAll(), - outputs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java deleted file mode 100644 index 4d38448..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo.Bound; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.ImmutableMap; - -import java.util.Collections; - -/** - * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the - * {@link Bound ParDo.Bound} primitive {@link PTransform}. - */ -class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { - @Override - public <T> TransformEvaluator<T> forApplication( - final AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = - createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); - return evaluator; - } - - private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator( - @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, - Bound<InputT, OutputT>> application, - CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) { - TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); - - return ParDoInProcessEvaluator.create( - evaluationContext, - inputBundle, - application, - application.getTransform().getFn(), - application.getTransform().getSideInputs(), - mainOutputTag, - Collections.<TupleTag<?>>emptyList(), - ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java deleted file mode 100644 index a90cd7b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/PassthroughTransformEvaluator.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.util.WindowedValue; - -class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> { - public static <InputT> PassthroughTransformEvaluator<InputT> create( - AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) { - return new PassthroughTransformEvaluator<>(transform, output); - } - - private final AppliedPTransform<?, ?, ?> transform; - private final UncommittedBundle<InputT> output; - - private PassthroughTransformEvaluator( - AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) { - this.transform = transform; - this.output = output; - } - - @Override - public void processElement(WindowedValue<InputT> element) throws Exception { - output.add(element); - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return StepTransformResult.withoutHold(transform).addOutput(output).build(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java deleted file mode 100644 index 88630ad..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ShardControlledWrite.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PDone; - -import java.util.concurrent.ThreadLocalRandom; - -/** - * A write that explicitly controls its number of output shards. - */ -abstract class ShardControlledWrite<InputT> - extends ForwardingPTransform<PCollection<InputT>, PDone> { - @Override - public PDone apply(PCollection<InputT> input) { - int numShards = getNumShards(); - checkArgument( - numShards >= 1, - "%s should only be applied if the output has a controlled number of shards (> 1); got %s", - getClass().getSimpleName(), - getNumShards()); - PCollectionList<InputT> shards = - input.apply( - "PartitionInto" + numShards + "Shards", - Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>())); - for (int i = 0; i < shards.size(); i++) { - PCollection<InputT> shard = shards.get(i); - PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i); - shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard); - } - return PDone.in(input.getPipeline()); - } - - /** - * Returns the number of shards this {@link PTransform} should write to. - */ - abstract int getNumShards(); - - /** - * Returns a {@link PTransform} that performs a write to the shard with the specified shard - * number. - * - * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for - * shard numbers {@code [0...n)}. - */ - abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum); - - private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> { - int nextPartition = -1; - @Override - public int partitionFor(T elem, int numPartitions) { - if (nextPartition < 0) { - nextPartition = ThreadLocalRandom.current().nextInt(numPartitions); - } - nextPartition++; - nextPartition %= numPartitions; - return nextPartition; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java deleted file mode 100644 index 9c4d9aa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepAndKey.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.transforms.AppliedPTransform; - -import com.google.common.base.MoreObjects; - -import java.util.Objects; - -/** - * A (Step, Key) pair. This is useful as a map key or cache key for things that are available - * per-step in a keyed manner (e.g. State). - */ -final class StepAndKey { - private final AppliedPTransform<?, ?, ?> step; - private final Object key; - - /** - * Create a new {@link StepAndKey} with the provided step and key. - */ - public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) { - return new StepAndKey(step, key); - } - - private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) { - this.step = step; - this.key = key; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(StepAndKey.class) - .add("step", step.getFullName()) - .add("key", key) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hash(step, key); - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } else if (!(other instanceof StepAndKey)) { - return false; - } else { - StepAndKey that = (StepAndKey) other; - return Objects.equals(this.step, that.step) - && Objects.equals(this.key, that.key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java deleted file mode 100644 index 8874eda..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/StepTransformResult.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; - -import java.util.Collection; - -import javax.annotation.Nullable; - -/** - * An immutable {@link InProcessTransformResult}. - */ -public class StepTransformResult implements InProcessTransformResult { - private final AppliedPTransform<?, ?, ?> transform; - private final Iterable<? extends UncommittedBundle<?>> bundles; - @Nullable private final CopyOnAccessInMemoryStateInternals<?> state; - private final TimerUpdate timerUpdate; - @Nullable private final CounterSet counters; - private final Instant watermarkHold; - - private StepTransformResult( - AppliedPTransform<?, ?, ?> transform, - Iterable<? extends UncommittedBundle<?>> outputBundles, - CopyOnAccessInMemoryStateInternals<?> state, - TimerUpdate timerUpdate, - CounterSet counters, - Instant watermarkHold) { - this.transform = checkNotNull(transform); - this.bundles = checkNotNull(outputBundles); - this.state = state; - this.timerUpdate = checkNotNull(timerUpdate); - this.counters = counters; - this.watermarkHold = checkNotNull(watermarkHold); - } - - @Override - public Iterable<? extends UncommittedBundle<?>> getOutputBundles() { - return bundles; - } - - @Override - public CounterSet getCounters() { - return counters; - } - - @Override - public AppliedPTransform<?, ?, ?> getTransform() { - return transform; - } - - @Override - public Instant getWatermarkHold() { - return watermarkHold; - } - - @Nullable - @Override - public CopyOnAccessInMemoryStateInternals<?> getState() { - return state; - } - - @Override - public TimerUpdate getTimerUpdate() { - return timerUpdate; - } - - public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { - return new Builder(transform, watermarkHold); - } - - public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) { - return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(StepTransformResult.class) - .add("transform", transform) - .toString(); - } - - /** - * A builder for creating instances of {@link StepTransformResult}. - */ - public static class Builder { - private final AppliedPTransform<?, ?, ?> transform; - private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; - private CopyOnAccessInMemoryStateInternals<?> state; - private TimerUpdate timerUpdate; - private CounterSet counters; - private final Instant watermarkHold; - - private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { - this.transform = transform; - this.watermarkHold = watermarkHold; - this.bundlesBuilder = ImmutableList.builder(); - this.timerUpdate = TimerUpdate.builder(null).build(); - } - - public StepTransformResult build() { - return new StepTransformResult( - transform, - bundlesBuilder.build(), - state, - timerUpdate, - counters, - watermarkHold); - } - - public Builder withCounters(CounterSet counters) { - this.counters = counters; - return this; - } - - public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) { - this.state = state; - return this; - } - - public Builder withTimerUpdate(TimerUpdate timerUpdate) { - this.timerUpdate = timerUpdate; - return this; - } - - public Builder addOutput( - UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) { - bundlesBuilder.add(outputBundle); - bundlesBuilder.add(outputBundles); - return this; - } - - public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) { - bundlesBuilder.addAll(outputBundles); - return this; - } - } -}