Repository: incubator-beam Updated Branches: refs/heads/master fe17ef7f8 -> e2c21599d
Actually Split Root Transforms Permit the ExecutorServiceParallelExecutor to control its own ExecutorService by passing only a TargetParallelism parameter. Split roots into the greater of 3 or the target parallelism. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b1cec29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b1cec29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b1cec29 Branch: refs/heads/master Commit: 6b1cec2930a199bba2c65d379116c746532c4148 Parents: fe17ef7 Author: Thomas Groh <[email protected]> Authored: Thu Nov 10 13:47:40 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Fri Nov 11 16:54:11 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 27 +-------- .../direct/ExecutorServiceParallelExecutor.java | 15 +++-- .../beam/runners/direct/DirectRunnerTest.java | 62 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index f4aeb3e..c9a7864 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -29,8 +29,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.annotation.Nullable; import org.apache.beam.runners.core.GBKIntoKeyedWorkItems; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; @@ -258,7 +256,6 @@ public class DirectRunner //////////////////////////////////////////////////////////////////////////////////////////////// private final DirectOptions options; private final Set<Enforcement> enabledEnforcements; - private Supplier<ExecutorService> executorServiceSupplier; private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier(); public static DirectRunner fromOptions(PipelineOptions options) { @@ -268,7 +265,6 @@ public class DirectRunner private DirectRunner(DirectOptions options) { this.options = options; this.enabledEnforcements = Enforcement.enabled(options); - this.executorServiceSupplier = new FixedThreadPoolSupplier(options); } /** @@ -326,14 +322,11 @@ public class DirectRunner consumerTrackingVisitor.getStepNames(), consumerTrackingVisitor.getViews()); - // independent executor service for each run - ExecutorService executorService = executorServiceSupplier.get(); - RootInputProvider rootInputProvider = RootProviderRegistry.defaultRegistry(context); TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context); PipelineExecutor executor = ExecutorServiceParallelExecutor.create( - executorService, + options.getTargetParallelism(), consumerTrackingVisitor.getValueToConsumers(), keyedPValueVisitor.getKeyedPValues(), rootInputProvider, @@ -470,24 +463,6 @@ public class DirectRunner } /** - * A {@link Supplier} that creates a {@link ExecutorService} based on - * {@link Executors#newFixedThreadPool(int)}. - */ - private static class FixedThreadPoolSupplier implements Supplier<ExecutorService> { - private final DirectOptions options; - - private FixedThreadPoolSupplier(DirectOptions options) { - this.options = options; - } - - @Override - public ExecutorService get() { - return Executors.newFixedThreadPool(options.getTargetParallelism()); - } - } - - - /** * A {@link Supplier} that creates a {@link NanosOffsetClock}. */ private static class NanosOffsetClockSupplier implements Supplier<Clock> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 30fc417..0bb3d01 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory; final class ExecutorServiceParallelExecutor implements PipelineExecutor { private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); + private final int targetParallelism; private final ExecutorService executorService; private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; @@ -101,7 +103,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { private final AtomicLong outstandingWork = new AtomicLong(); public static ExecutorServiceParallelExecutor create( - ExecutorService executorService, + int targetParallelism, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Set<PValue> keyedPValues, RootInputProvider rootInputProvider, @@ -111,7 +113,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { transformEnforcements, EvaluationContext context) { return new ExecutorServiceParallelExecutor( - executorService, + targetParallelism, valueToConsumers, keyedPValues, rootInputProvider, @@ -121,7 +123,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } private ExecutorServiceParallelExecutor( - ExecutorService executorService, + int targetParallelism, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Set<PValue> keyedPValues, RootInputProvider rootInputProvider, @@ -129,7 +131,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { @SuppressWarnings("rawtypes") Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { - this.executorService = executorService; + this.targetParallelism = targetParallelism; + this.executorService = Executors.newFixedThreadPool(targetParallelism); this.valueToConsumers = valueToConsumers; this.keyedPValues = keyedPValues; this.rootInputProvider = rootInputProvider; @@ -164,10 +167,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { @Override public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { + int numTargetSplits = Math.max(3, targetParallelism); for (AppliedPTransform<?, ?, ?> root : roots) { ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>(); try { - Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root, 1); + Collection<CommittedBundle<?>> initialInputs = + rootInputProvider.getInitialInputs(root, numTargetSplits); pending.addAll(initialInputs); } catch (Exception e) { throw UserCodeException.wrap(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 3836f58..3c860b1 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -38,11 +39,15 @@ import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.DistributionResult; @@ -219,6 +224,15 @@ public class DirectRunnerTest implements Serializable { } @Test + public void splitsInputs() { + Pipeline p = getPipeline(); + PCollection<Long> longs = p.apply(Read.from(MustSplitSource.of(CountingSource.upTo(3)))); + + PAssert.that(longs).containsInAnyOrder(0L, 1L, 2L); + p.run(); + } + + @Test public void transformDisplayDataExceptionShouldFail() { DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() { @ProcessElement @@ -478,4 +492,52 @@ public class DirectRunnerTest implements Serializable { DistributionResult.create(26L, 3L, 5L, 13L), DistributionResult.create(26L, 3L, 5L, 13L)))); } + + private static class MustSplitSource<T> extends BoundedSource<T>{ + public static <T> BoundedSource<T> of(BoundedSource<T> underlying) { + return new MustSplitSource<>(underlying); + } + + private final BoundedSource<T> underlying; + + public MustSplitSource(BoundedSource<T> underlying) { + this.underlying = underlying; + } + + @Override + public List<? extends BoundedSource<T>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + // Must have more than + checkState( + desiredBundleSizeBytes < getEstimatedSizeBytes(options), + "Must split into more than one source"); + return underlying.splitIntoBundles(desiredBundleSizeBytes, options); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return underlying.getEstimatedSizeBytes(options); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return underlying.producesSortedKeys(options); + } + + @Override + public BoundedReader<T> createReader(PipelineOptions options) throws IOException { + throw new IllegalStateException( + "The MustSplitSource cannot create a reader without being split first"); + } + + @Override + public void validate() { + underlying.validate(); + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return underlying.getDefaultOutputCoder(); + } + } }
