http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java new file mode 100644 index 0000000..1c51738 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { + public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create( + InProcessEvaluationContext evaluationContext, + CommittedBundle<InputT> inputBundle, + AppliedPTransform<PCollection<InputT>, ?, ?> application, + DoFn<InputT, OutputT> fn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + Map<TupleTag<?>, PCollection<?>> outputs) { + InProcessExecutionContext executionContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()); + String stepName = evaluationContext.getStepName(application); + InProcessStepContext stepContext = + executionContext.getOrCreateStepContext(stepName, stepName); + + CounterSet counters = evaluationContext.createCounterSet(); + + Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); + for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { + outputBundles.put( + outputEntry.getKey(), + evaluationContext.createBundle(inputBundle, outputEntry.getValue())); + } + + DoFnRunner<InputT, OutputT> runner = + DoFnRunners.createDefault( + evaluationContext.getPipelineOptions(), + SerializableUtils.clone(fn), + evaluationContext.createSideInputReader(sideInputs), + BundleOutputManager.create(outputBundles), + mainOutputTag, + sideOutputTags, + stepContext, + counters.getAddCounterMutator(), + application.getInput().getWindowingStrategy()); + + try { + runner.startBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + + return new ParDoInProcessEvaluator<>( + runner, application, counters, outputBundles.values(), stepContext); + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + private final DoFnRunner<T, ?> fnRunner; + private final AppliedPTransform<PCollection<T>, ?, ?> transform; + private final CounterSet counters; + private final Collection<UncommittedBundle<?>> outputBundles; + private final InProcessStepContext stepContext; + + private ParDoInProcessEvaluator( + DoFnRunner<T, ?> fnRunner, + AppliedPTransform<PCollection<T>, ?, ?> transform, + CounterSet counters, + Collection<UncommittedBundle<?>> outputBundles, + InProcessStepContext stepContext) { + this.fnRunner = fnRunner; + this.transform = transform; + this.counters = counters; + this.outputBundles = outputBundles; + this.stepContext = stepContext; + } + + @Override + public void processElement(WindowedValue<T> element) { + try { + fnRunner.processElement(element); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + @Override + public InProcessTransformResult finishBundle() { + try { + fnRunner.finishBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + StepTransformResult.Builder resultBuilder; + CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); + if (state != null) { + resultBuilder = + StepTransformResult.withHold(transform, state.getEarliestWatermarkHold()) + .withState(state); + } else { + resultBuilder = StepTransformResult.withoutHold(transform); + } + return resultBuilder + .addOutput(outputBundles) + .withTimerUpdate(stepContext.getTimerUpdate()) + .withCounters(counters) + .build(); + } + + static class BundleOutputManager implements OutputManager { + private final Map<TupleTag<?>, UncommittedBundle<?>> bundles; + private final Map<TupleTag<?>, List<?>> undeclaredOutputs; + + public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) { + return new BundleOutputManager(outputBundles); + } + + private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) { + this.bundles = bundles; + undeclaredOutputs = new HashMap<>(); + } + + @SuppressWarnings("unchecked") + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + @SuppressWarnings("rawtypes") + UncommittedBundle bundle = bundles.get(tag); + if (bundle == null) { + List undeclaredContents = undeclaredOutputs.get(tag); + if (undeclaredContents == null) { + undeclaredContents = new ArrayList<T>(); + undeclaredOutputs.put(tag, undeclaredContents); + } + undeclaredContents.add(output); + } else { + bundle.add(output); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java new file mode 100644 index 0000000..ae8ac6f --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Map; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link BoundMulti} primitive {@link PTransform}. + */ +class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public <T> TransformEvaluator<T> forApplication( + AppliedPTransform<?, ?, ?> application, + CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + return evaluator; + } + + private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator( + AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application, + CommittedBundle<InT> inputBundle, + InProcessEvaluationContext evaluationContext) { + Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); + DoFn<InT, OuT> fn = application.getTransform().getFn(); + + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + fn, + application.getTransform().getSideInputs(), + application.getTransform().getMainOutputTag(), + application.getTransform().getSideOutputTags().getAll(), + outputs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java new file mode 100644 index 0000000..989ae51 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.collect.ImmutableMap; + +import java.util.Collections; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link Bound ParDo.Bound} primitive {@link PTransform}. + */ +class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public <T> TransformEvaluator<T> forApplication( + final AppliedPTransform<?, ?, ?> application, + CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + return evaluator; + } + + private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator( + @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, + Bound<InputT, OutputT>> application, + CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) { + TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); + + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + application.getTransform().getFn(), + application.getTransform().getSideInputs(), + mainOutputTag, + Collections.<TupleTag<?>>emptyList(), + ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java new file mode 100644 index 0000000..aef62b2 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; + +class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT> { + public static <InputT> PassthroughTransformEvaluator<InputT> create( + AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) { + return new PassthroughTransformEvaluator<>(transform, output); + } + + private final AppliedPTransform<?, ?, ?> transform; + private final UncommittedBundle<InputT> output; + + private PassthroughTransformEvaluator( + AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output) { + this.transform = transform; + this.output = output; + } + + @Override + public void processElement(WindowedValue<InputT> element) throws Exception { + output.add(element); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withoutHold(transform).addOutput(output).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java new file mode 100644 index 0000000..4687f85 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Partition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PDone; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * A write that explicitly controls its number of output shards. + */ +abstract class ShardControlledWrite<InputT> + extends ForwardingPTransform<PCollection<InputT>, PDone> { + @Override + public PDone apply(PCollection<InputT> input) { + int numShards = getNumShards(); + checkArgument( + numShards >= 1, + "%s should only be applied if the output has a controlled number of shards (> 1); got %s", + getClass().getSimpleName(), + getNumShards()); + PCollectionList<InputT> shards = + input.apply( + "PartitionInto" + numShards + "Shards", + Partition.of(getNumShards(), new RandomSeedPartitionFn<InputT>())); + for (int i = 0; i < shards.size(); i++) { + PCollection<InputT> shard = shards.get(i); + PTransform<? super PCollection<InputT>, PDone> writeShard = getSingleShardTransform(i); + shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard); + } + return PDone.in(input.getPipeline()); + } + + /** + * Returns the number of shards this {@link PTransform} should write to. + */ + abstract int getNumShards(); + + /** + * Returns a {@link PTransform} that performs a write to the shard with the specified shard + * number. + * + * <p>This method will be called n times, where n is the value of {@link #getNumShards()}, for + * shard numbers {@code [0...n)}. + */ + abstract PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum); + + private static class RandomSeedPartitionFn<T> implements Partition.PartitionFn<T> { + int nextPartition = -1; + @Override + public int partitionFor(T elem, int numPartitions) { + if (nextPartition < 0) { + nextPartition = ThreadLocalRandom.current().nextInt(numPartitions); + } + nextPartition++; + nextPartition %= numPartitions; + return nextPartition; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java new file mode 100644 index 0000000..1c7cf6c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * A (Step, Key) pair. This is useful as a map key or cache key for things that are available + * per-step in a keyed manner (e.g. State). + */ +final class StepAndKey { + private final AppliedPTransform<?, ?, ?> step; + private final Object key; + + /** + * Create a new {@link StepAndKey} with the provided step and key. + */ + public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) { + return new StepAndKey(step, key); + } + + private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) { + this.step = step; + this.key = key; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepAndKey.class) + .add("step", step.getFullName()) + .add("key", key) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(step, key); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof StepAndKey)) { + return false; + } else { + StepAndKey that = (StepAndKey) other; + return Objects.equals(this.step, that.step) + && Objects.equals(this.key, that.key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java new file mode 100644 index 0000000..46e7d04 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; + +import java.util.Collection; + +import javax.annotation.Nullable; + +/** + * An immutable {@link InProcessTransformResult}. + */ +public class StepTransformResult implements InProcessTransformResult { + private final AppliedPTransform<?, ?, ?> transform; + private final Iterable<? extends UncommittedBundle<?>> bundles; + @Nullable private final CopyOnAccessInMemoryStateInternals<?> state; + private final TimerUpdate timerUpdate; + @Nullable private final CounterSet counters; + private final Instant watermarkHold; + + private StepTransformResult( + AppliedPTransform<?, ?, ?> transform, + Iterable<? extends UncommittedBundle<?>> outputBundles, + CopyOnAccessInMemoryStateInternals<?> state, + TimerUpdate timerUpdate, + CounterSet counters, + Instant watermarkHold) { + this.transform = checkNotNull(transform); + this.bundles = checkNotNull(outputBundles); + this.state = state; + this.timerUpdate = checkNotNull(timerUpdate); + this.counters = counters; + this.watermarkHold = checkNotNull(watermarkHold); + } + + @Override + public Iterable<? extends UncommittedBundle<?>> getOutputBundles() { + return bundles; + } + + @Override + public CounterSet getCounters() { + return counters; + } + + @Override + public AppliedPTransform<?, ?, ?> getTransform() { + return transform; + } + + @Override + public Instant getWatermarkHold() { + return watermarkHold; + } + + @Nullable + @Override + public CopyOnAccessInMemoryStateInternals<?> getState() { + return state; + } + + @Override + public TimerUpdate getTimerUpdate() { + return timerUpdate; + } + + public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { + return new Builder(transform, watermarkHold); + } + + public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) { + return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepTransformResult.class) + .add("transform", transform) + .toString(); + } + + /** + * A builder for creating instances of {@link StepTransformResult}. + */ + public static class Builder { + private final AppliedPTransform<?, ?, ?> transform; + private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; + private CopyOnAccessInMemoryStateInternals<?> state; + private TimerUpdate timerUpdate; + private CounterSet counters; + private final Instant watermarkHold; + + private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { + this.transform = transform; + this.watermarkHold = watermarkHold; + this.bundlesBuilder = ImmutableList.builder(); + this.timerUpdate = TimerUpdate.builder(null).build(); + } + + public StepTransformResult build() { + return new StepTransformResult( + transform, + bundlesBuilder.build(), + state, + timerUpdate, + counters, + watermarkHold); + } + + public Builder withCounters(CounterSet counters) { + this.counters = counters; + return this; + } + + public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) { + this.state = state; + return this; + } + + public Builder withTimerUpdate(TimerUpdate timerUpdate) { + this.timerUpdate = timerUpdate; + return this; + } + + public Builder addOutput( + UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) { + bundlesBuilder.add(outputBundle); + bundlesBuilder.add(outputBundles); + return this; + } + + public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) { + bundlesBuilder.addAll(outputBundles); + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java new file mode 100644 index 0000000..be1bf18 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextIO.Write.Bound; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +class TextIOShardedWriteFactory implements PTransformOverrideFactory { + + @Override + public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( + PTransform<InputT, OutputT> transform) { + if (transform instanceof TextIO.Write.Bound) { + @SuppressWarnings("unchecked") + TextIO.Write.Bound<InputT> originalWrite = (TextIO.Write.Bound<InputT>) transform; + if (originalWrite.getNumShards() > 1 + || (originalWrite.getNumShards() == 1 + && !"".equals(originalWrite.getShardNameTemplate()))) { + @SuppressWarnings("unchecked") + PTransform<InputT, OutputT> override = + (PTransform<InputT, OutputT>) new TextIOShardedWrite<InputT>(originalWrite); + return override; + } + } + return transform; + } + + private static class TextIOShardedWrite<InputT> extends ShardControlledWrite<InputT> { + private final TextIO.Write.Bound<InputT> initial; + + private TextIOShardedWrite(Bound<InputT> initial) { + this.initial = initial; + } + + @Override + int getNumShards() { + return initial.getNumShards(); + } + + @Override + PTransform<PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) { + String shardName = + IOChannelUtils.constructName( + initial.getFilenamePrefix(), + initial.getShardTemplate(), + initial.getFilenameSuffix(), + shardNum, + getNumShards()); + return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding(); + } + + @Override + protected PTransform<PCollection<InputT>, PDone> delegate() { + return initial; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java new file mode 100644 index 0000000..ba9815b --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * An evaluator of a specific application of a transform. Will be used for at least one + * {@link CommittedBundle}. + * + * @param <InputT> the type of elements that will be passed to {@link #processElement} + */ +public interface TransformEvaluator<InputT> { + /** + * Process an element in the input {@link CommittedBundle}. + * + * @param element the element to process + */ + void processElement(WindowedValue<InputT> element) throws Exception; + + /** + * Finish processing the bundle of this {@link TransformEvaluator}. + * + * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused, + * and no more elements will be processed. + * + * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation. + */ + InProcessTransformResult finishBundle() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java new file mode 100644 index 0000000..8f8d84c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; + +import javax.annotation.Nullable; + +/** + * A factory for creating instances of {@link TransformEvaluator} for the application of a + * {@link PTransform}. + */ +public interface TransformEvaluatorFactory { + /** + * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. + * + * Any work that must be done before input elements are processed (such as calling + * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is + * made available to the caller. + * + * @throws Exception whenever constructing the underlying evaluator throws an exception + */ + <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java new file mode 100644 index 0000000..f449731 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} + * implementations based on the type of {@link PTransform} of the application. + */ +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { + public static TransformEvaluatorRegistry defaultRegistry() { + @SuppressWarnings("rawtypes") + ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives = + ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) + .put( + GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, + new GroupByKeyEvaluatorFactory()) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) + .put(Window.Bound.class, new WindowEvaluatorFactory()) + .build(); + return new TransformEvaluatorRegistry(primitives); + } + + // the TransformEvaluatorFactories can construct instances of all generic types of transform, + // so all instances of a primitive can be handled with the same evaluator factory. + @SuppressWarnings("rawtypes") + private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories; + + private TransformEvaluatorRegistry( + @SuppressWarnings("rawtypes") + Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) { + this.factories = factories; + } + + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, + @Nullable CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); + return factory.forApplication(application, inputBundle, evaluationContext); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java new file mode 100644 index 0000000..8346e89 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; + +import com.google.common.base.Throwables; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +/** + * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a + * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering + * the result using a registered {@link CompletionCallback}. + * + * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread + * that it is being executed on. + */ +class TransformExecutor<T> implements Callable<InProcessTransformResult> { + public static <T> TransformExecutor<T> create( + TransformEvaluatorFactory factory, + Iterable<? extends ModelEnforcementFactory> modelEnforcements, + InProcessEvaluationContext evaluationContext, + CommittedBundle<T> inputBundle, + AppliedPTransform<?, ?, ?> transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + return new TransformExecutor<>( + factory, + modelEnforcements, + evaluationContext, + inputBundle, + transform, + completionCallback, + transformEvaluationState); + } + + private final TransformEvaluatorFactory evaluatorFactory; + private final Iterable<? extends ModelEnforcementFactory> modelEnforcements; + + private final InProcessEvaluationContext evaluationContext; + + /** The transform that will be evaluated. */ + private final AppliedPTransform<?, ?, ?> transform; + /** The inputs this {@link TransformExecutor} will deliver to the transform. */ + private final CommittedBundle<T> inputBundle; + + private final CompletionCallback onComplete; + private final TransformExecutorService transformEvaluationState; + + private final AtomicReference<Thread> thread; + + private TransformExecutor( + TransformEvaluatorFactory factory, + Iterable<? extends ModelEnforcementFactory> modelEnforcements, + InProcessEvaluationContext evaluationContext, + CommittedBundle<T> inputBundle, + AppliedPTransform<?, ?, ?> transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + this.evaluatorFactory = factory; + this.modelEnforcements = modelEnforcements; + this.evaluationContext = evaluationContext; + + this.inputBundle = inputBundle; + this.transform = transform; + + this.onComplete = completionCallback; + + this.transformEvaluationState = transformEvaluationState; + this.thread = new AtomicReference<>(); + } + + @Override + public InProcessTransformResult call() { + checkState( + thread.compareAndSet(null, Thread.currentThread()), + "Tried to execute %s for %s on thread %s, but is already executing on thread %s", + TransformExecutor.class.getSimpleName(), + transform.getFullName(), + Thread.currentThread(), + thread.get()); + try { + Collection<ModelEnforcement<T>> enforcements = new ArrayList<>(); + for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { + ModelEnforcement<T> enforcement = enforcementFactory.forBundle(inputBundle, transform); + enforcements.add(enforcement); + } + TransformEvaluator<T> evaluator = + evaluatorFactory.forApplication(transform, inputBundle, evaluationContext); + + processElements(evaluator, enforcements); + + InProcessTransformResult result = finishBundle(evaluator, enforcements); + return result; + } catch (Throwable t) { + onComplete.handleThrowable(inputBundle, t); + throw Throwables.propagate(t); + } finally { + transformEvaluationState.complete(this); + } + } + + /** + * Processes all the elements in the input bundle using the transform evaluator, applying any + * necessary {@link ModelEnforcement ModelEnforcements}. + */ + private void processElements( + TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) + throws Exception { + if (inputBundle != null) { + for (WindowedValue<T> value : inputBundle.getElements()) { + for (ModelEnforcement<T> enforcement : enforcements) { + enforcement.beforeElement(value); + } + + evaluator.processElement(value); + + for (ModelEnforcement<T> enforcement : enforcements) { + enforcement.afterElement(value); + } + } + } + } + + /** + * Finishes processing the input bundle and commit the result using the + * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary. + * + * @return the {@link InProcessTransformResult} produced by + * {@link TransformEvaluator#finishBundle()} + */ + private InProcessTransformResult finishBundle( + TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) + throws Exception { + InProcessTransformResult result = evaluator.finishBundle(); + CommittedResult outputs = onComplete.handleResult(inputBundle, result); + for (ModelEnforcement<T> enforcement : enforcements) { + enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); + } + return result; + } + + /** + * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. + * Otherwise, return null. + */ + @Nullable + public Thread getThread() { + return thread.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java new file mode 100644 index 0000000..837b858 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +/** + * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as + * appropriate for the {@link StepAndKey} the executor exists for. + */ +interface TransformExecutorService { + /** + * Schedule the provided work to be eventually executed. + */ + void schedule(TransformExecutor<?> work); + + /** + * Finish executing the provided work. This may cause additional + * {@link TransformExecutor TransformExecutors} to be evaluated. + */ + void complete(TransformExecutor<?> completed); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java new file mode 100644 index 0000000..087b7c2 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import com.google.common.base.MoreObjects; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Static factory methods for constructing instances of {@link TransformExecutorService}. + */ +final class TransformExecutorServices { + private TransformExecutorServices() { + // Do not instantiate + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * parallel. + */ + public static TransformExecutorService parallel( + ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { + return new ParallelEvaluationState(executor, scheduled); + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * serial. + */ + public static TransformExecutorService serial( + ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { + return new SerialEvaluationState(executor, scheduled); + } + + /** + * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor} + * scheduled will be immediately submitted to the {@link ExecutorService}. + * + * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are + * processed in parallel. + */ + private static class ParallelEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map<TransformExecutor<?>, Boolean> scheduled; + + private ParallelEvaluationState( + ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { + this.executor = executor; + this.scheduled = scheduled; + } + + @Override + public void schedule(TransformExecutor<?> work) { + executor.submit(work); + scheduled.put(work, true); + } + + @Override + public void complete(TransformExecutor<?> completed) { + scheduled.remove(completed); + } + } + + /** + * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor} + * scheduled will be placed on the work queue. Only one item of work will be submitted to the + * {@link ExecutorService} at any time. + * + * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair. + * Keyed computations are processed serially per step. + */ + private static class SerialEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map<TransformExecutor<?>, Boolean> scheduled; + + private AtomicReference<TransformExecutor<?>> currentlyEvaluating; + private final Queue<TransformExecutor<?>> workQueue; + + private SerialEvaluationState( + ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) { + this.scheduled = scheduled; + this.executor = executor; + this.currentlyEvaluating = new AtomicReference<>(); + this.workQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * Schedules the work, adding it to the work queue if there is a bundle currently being + * evaluated and scheduling it immediately otherwise. + */ + @Override + public void schedule(TransformExecutor<?> work) { + workQueue.offer(work); + updateCurrentlyEvaluating(); + } + + @Override + public void complete(TransformExecutor<?> completed) { + if (!currentlyEvaluating.compareAndSet(completed, null)) { + throw new IllegalStateException( + "Finished work " + + completed + + " but could not complete due to unexpected currently executing " + + currentlyEvaluating.get()); + } + scheduled.remove(completed); + updateCurrentlyEvaluating(); + } + + private void updateCurrentlyEvaluating() { + if (currentlyEvaluating.get() == null) { + // Only synchronize if we need to update what's currently evaluating + synchronized (this) { + TransformExecutor<?> newWork = workQueue.poll(); + if (newWork != null) { + if (currentlyEvaluating.compareAndSet(null, newWork)) { + scheduled.put(newWork, true); + executor.submit(newWork); + } else { + workQueue.offer(newWork); + } + } + } + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(SerialEvaluationState.class) + .add("currentlyEvaluating", currentlyEvaluating) + .add("workQueue", workQueue) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java new file mode 100644 index 0000000..7a95c9f --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} + * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. + */ +class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { + /* + * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted. + * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused + * and any splits are honored. + */ + private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>> + sourceEvaluators = new ConcurrentHashMap<>(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, + @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { + return getTransformEvaluator((AppliedPTransform) application, evaluationContext); + } + + private <OutputT> TransformEvaluator<?> getTransformEvaluator( + final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) { + UnboundedReadEvaluator<?> currentEvaluator = + getTransformEvaluatorQueue(transform, evaluationContext).poll(); + if (currentEvaluator == null) { + return EmptyTransformEvaluator.create(transform); + } + return currentEvaluator; + } + + /** + * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the + * provided application of {@link Unbounded Read.Unbounded}, initializing it if required. + * + * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has + * already done so. + */ + @SuppressWarnings("unchecked") + private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( + final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) { + // Key by the application and the context the evaluation is occurring in (which call to + // Pipeline#run). + EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); + @SuppressWarnings("unchecked") + Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue = + (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + if (evaluatorQueue == null) { + evaluatorQueue = new ConcurrentLinkedQueue<>(); + if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { + // If no queue existed in the evaluators, add an evaluator to initialize the evaluator + // factory for this transform + UnboundedSource<OutputT, ?> source = transform.getTransform().getSource(); + UnboundedReadEvaluator<OutputT> evaluator = + new UnboundedReadEvaluator<OutputT>( + transform, evaluationContext, source, evaluatorQueue); + evaluatorQueue.offer(evaluator); + } else { + // otherwise return the existing Queue that arrived before us + evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + } + } + return evaluatorQueue; + } + + /** + * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource}, + * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator + * creates the {@link UnboundedReader} and consumes some currently available input. + * + * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be + * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own + * checkpoint, and constructs its reader from the current checkpoint in each call to + * {@link #finishBundle()}. + */ + private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { + private static final int ARBITRARY_MAX_ELEMENTS = 10; + private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform; + private final InProcessEvaluationContext evaluationContext; + private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue; + /** + * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same + * source as derived from {@link #transform} due to splitting. + */ + private final UnboundedSource<OutputT, ?> source; + private CheckpointMark checkpointMark; + + public UnboundedReadEvaluator( + AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, + InProcessEvaluationContext evaluationContext, + UnboundedSource<OutputT, ?> source, + Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) { + this.transform = transform; + this.evaluationContext = evaluationContext; + this.evaluatorQueue = evaluatorQueue; + this.source = source; + this.checkpointMark = null; + } + + @Override + public void processElement(WindowedValue<Object> element) {} + + @Override + public InProcessTransformResult finishBundle() throws IOException { + UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput()); + try (UnboundedReader<OutputT> reader = + createReader(source, evaluationContext.getPipelineOptions());) { + int numElements = 0; + if (reader.start()) { + do { + output.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); + numElements++; + } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); + } + checkpointMark = reader.getCheckpointMark(); + checkpointMark.finalizeCheckpoint(); + // TODO: When exercising create initial splits, make this the minimum watermark across all + // existing readers + StepTransformResult result = + StepTransformResult.withHold(transform, reader.getWatermark()) + .addOutput(output) + .build(); + evaluatorQueue.offer(this); + return result; + } + } + + private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT> createReader( + UnboundedSource<OutputT, CheckpointMarkT> source, PipelineOptions options) { + @SuppressWarnings("unchecked") + CheckpointMarkT mark = (CheckpointMarkT) checkpointMark; + return source.createReader(options, mark); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java new file mode 100644 index 0000000..ffaf3fa --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import java.util.ArrayList; +import java.util.List; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link CreatePCollectionView} primitive {@link PTransform}. + * + * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for + * the {@link WriteView} {@link PTransform}, which is part of the + * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the + * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is + * written. + */ +class ViewEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public <T> TransformEvaluator<T> forApplication( + AppliedPTransform<?, ?, ?> application, + InProcessPipelineRunner.CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = createEvaluator( + (AppliedPTransform) application, evaluationContext); + return evaluator; + } + + private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator( + final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> + application, + InProcessEvaluationContext context) { + PCollection<Iterable<InT>> input = application.getInput(); + final PCollectionViewWriter<InT, OuT> writer = + context.createPCollectionViewWriter(input, application.getOutput()); + return new TransformEvaluator<Iterable<InT>>() { + private final List<WindowedValue<InT>> elements = new ArrayList<>(); + + @Override + public void processElement(WindowedValue<Iterable<InT>> element) { + for (InT input : element.getValue()) { + elements.add(element.withValue(input)); + } + } + + @Override + public InProcessTransformResult finishBundle() { + writer.add(elements); + return StepTransformResult.withoutHold(application).build(); + } + }; + } + + public static class InProcessViewOverrideFactory implements PTransformOverrideFactory { + @Override + public <InputT extends PInput, OutputT extends POutput> + PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) { + if (transform instanceof CreatePCollectionView) { + + } + @SuppressWarnings({"rawtypes", "unchecked"}) + PTransform<InputT, OutputT> createView = + (PTransform<InputT, OutputT>) + new InProcessCreatePCollectionView<>((CreatePCollectionView) transform); + return createView; + } + } + + /** + * An in-process override for {@link CreatePCollectionView}. + */ + private static class InProcessCreatePCollectionView<ElemT, ViewT> + extends ForwardingPTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + public PCollectionView<ViewT> apply(PCollection<ElemT> input) { + return input.apply(WithKeys.<Void, ElemT>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) + .apply(GroupByKey.<Void, ElemT>create()) + .apply(Values.<Iterable<ElemT>>create()) + .apply(new WriteView<ElemT, ViewT>(og)); + } + + @Override + protected PTransform<PCollection<ElemT>, PCollectionView<ViewT>> delegate() { + return og; + } + } + + /** + * An in-process implementation of the {@link CreatePCollectionView} primitive. + * + * This implementation requires the input {@link PCollection} to be an iterable, which is provided + * to {@link PCollectionView#fromIterableInternal(Iterable)}. + */ + public static final class WriteView<ElemT, ViewT> + extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + WriteView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) { + return og.getView(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java new file mode 100644 index 0000000..4a3a517 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowingStrategy; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Executes callbacks that occur based on the progression of the watermark per-step. + * + * <p>Callbacks are registered by calls to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, + * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the + * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the + * windowing strategy would have been produced. + * + * <p>NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any + * {@link AppliedPTransform} - any call to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} + * that could have potentially already fired should be followed by a call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current + * value of the watermark. + */ +class WatermarkCallbackExecutor { + /** + * Create a new {@link WatermarkCallbackExecutor}. + */ + public static WatermarkCallbackExecutor create() { + return new WatermarkCallbackExecutor(); + } + + private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> + callbacks; + private final ExecutorService executor; + + private WatermarkCallbackExecutor() { + this.callbacks = new ConcurrentHashMap<>(); + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Execute the provided {@link Runnable} after the next call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have + * produced output. + */ + public void callOnGuaranteedFiring( + AppliedPTransform<?, ?, ?> step, + BoundedWindow window, + WindowingStrategy<?, ?> windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); + + PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); + if (callbacks.putIfAbsent(step, callbackQueue) != null) { + callbackQueue = callbacks.get(step); + } + } + + synchronized (callbackQueue) { + callbackQueue.offer(callback); + } + } + + /** + * Schedule all pending callbacks that must have produced output by the time of the provided + * watermark. + */ + public void fireForWatermark(AppliedPTransform<?, ?, ?> step, Instant watermark) { + PriorityQueue<WatermarkCallback> callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + return; + } + synchronized (callbackQueue) { + while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { + executor.submit(callbackQueue.poll().getCallback()); + } + } + } + + private static class WatermarkCallback { + public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring( + BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering<WatermarkCallback> { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java new file mode 100644 index 0000000..628f94d --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Bound; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Instant; + +import java.util.Collection; + +import javax.annotation.Nullable; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link Bound Window.Bound} primitive {@link PTransform}. + */ +class WindowEvaluatorFactory implements TransformEvaluatorFactory { + + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, + @Nullable CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + return createTransformEvaluator( + (AppliedPTransform) application, inputBundle, evaluationContext); + } + + private <InputT> TransformEvaluator<InputT> createTransformEvaluator( + AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, + CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) { + WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn(); + UncommittedBundle<InputT> outputBundle = + evaluationContext.createBundle(inputBundle, transform.getOutput()); + if (fn == null) { + return PassthroughTransformEvaluator.create(transform, outputBundle); + } + return new WindowIntoEvaluator<>(transform, fn, outputBundle); + } + + private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT> { + private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> + transform; + private final WindowFn<InputT, ?> windowFn; + private final UncommittedBundle<InputT> outputBundle; + + @SuppressWarnings("unchecked") + public WindowIntoEvaluator( + AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, + WindowFn<? super InputT, ?> windowFn, + UncommittedBundle<InputT> outputBundle) { + this.outputBundle = outputBundle; + this.transform = transform; + // Safe contravariant cast + this.windowFn = (WindowFn<InputT, ?>) windowFn; + } + + @Override + public void processElement(WindowedValue<InputT> element) throws Exception { + Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element); + outputBundle.add( + WindowedValue.<InputT>of( + element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); + } + + private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows( + WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception { + WindowFn<InputT, W>.AssignContext assignContext = + new InProcessAssignContext<>(windowFn, element); + Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext); + return windows; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); + } + } + + private static class InProcessAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + private final WindowedValue<InputT> value; + + public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { + fn.super(); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return value.getWindows(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java new file mode 100644 index 0000000..d290a4b --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.AvroIOTest; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; + +/** + * Tests for {@link AvroIOShardedWriteFactory}. + */ +@RunWith(JUnit4.class) +public class AvroIOShardedWriteFactoryTest { + + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + private AvroIOShardedWriteFactory factory; + + @Before + public void setup() { + factory = new AvroIOShardedWriteFactory(); + } + + @Test + public void originalWithoutShardingReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform<PCollection<String>, PDone> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding(); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform<PCollection<String>, PDone> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardedToOneReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + AvroIO.Write.Bound<String> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz"}; + p.apply(Create.<String>of(elems)).apply(overridden); + + file.delete(); + + p.run(); + AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate()); + } + + @Test + public void originalShardedToManyReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + AvroIO.Write.Bound<String> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; + p.apply(Create.<String>of(elems)).apply(overridden); + + file.delete(); + p.run(); + AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate()); + } +}
