Repository: incubator-beam Updated Branches: refs/heads/master e9a08e454 -> 665457c9c
Remove PubsubFileInjector and IntraBundleParallelization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc964748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc964748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc964748 Branch: refs/heads/master Commit: fc96474804cdcd950663e7eaea5c140652a07c8e Parents: e9a08e4 Author: peihe <hepeim...@gmail.com> Authored: Tue Sep 13 21:29:13 2016 -0700 Committer: peihe <hepeim...@gmail.com> Committed: Tue Sep 13 21:30:05 2016 -0700 ---------------------------------------------------------------------- .../examples/common/PubsubFileInjector.java | 153 -------- .../transforms/IntraBundleParallelization.java | 361 ------------------- .../IntraBundleParallelizationTest.java | 280 -------------- 3 files changed, 794 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java deleted file mode 100644 index 4634159..0000000 --- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.examples.common; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.IntraBundleParallelization; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.Transport; - -/** - * A batch Dataflow pipeline for injecting a set of GCS files into - * a PubSub topic line by line. Empty lines are skipped. - * - * <p>This is useful for testing streaming - * pipelines. Note that since batch pipelines might retry chunks, this - * does _not_ guarantee exactly-once injection of file data. Some lines may - * be published multiple times. - * </p> - */ -public class PubsubFileInjector { - - /** - * An incomplete {@code PubsubFileInjector} transform with unbound output topic. - */ - public static class Unbound { - private final String timestampLabelKey; - - Unbound() { - this.timestampLabelKey = null; - } - - Unbound(String timestampLabelKey) { - this.timestampLabelKey = timestampLabelKey; - } - - Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - public Bound publish(String outputTopic) { - return new Bound(outputTopic, timestampLabelKey); - } - } - - /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */ - public static class Bound extends OldDoFn<String, Void> { - private final String outputTopic; - private final String timestampLabelKey; - public transient Pubsub pubsub; - - public Bound(String outputTopic, String timestampLabelKey) { - this.outputTopic = outputTopic; - this.timestampLabelKey = timestampLabelKey; - } - - @Override - public void startBundle(Context context) { - this.pubsub = - Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class)) - .build(); - } - - @Override - public void processElement(ProcessContext c) throws IOException { - if (c.element().isEmpty()) { - return; - } - PubsubMessage pubsubMessage = new PubsubMessage(); - pubsubMessage.encodeData(c.element().getBytes()); - if (timestampLabelKey != null) { - pubsubMessage.setAttributes( - ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis()))); - } - PublishRequest publishRequest = new PublishRequest(); - publishRequest.setMessages(Arrays.asList(pubsubMessage)); - this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute(); - } - } - - /** - * Creates a {@code PubsubFileInjector} transform with the given timestamp label key. - */ - public static Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - /** - * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic. - */ - public static Bound publish(String outputTopic) { - return new Unbound().publish(outputTopic); - } - - /** - * Command line parameter options. - */ - private interface PubsubFileInjectorOptions extends PipelineOptions { - @Description("GCS location of files.") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Topic to publish on.") - @Validation.Required - String getOutputTopic(); - void setOutputTopic(String value); - } - - /** - * Sets up and starts streaming pipeline. - */ - public static void main(String[] args) { - PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(PubsubFileInjectorOptions.class); - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply(TextIO.Read.from(options.getInput())) - .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic())) - .withMaxParallelism(20)); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java deleted file mode 100644 index 1eef0e1..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Throwables; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.joda.time.Instant; - -/** - * Provides multi-threading of {@link OldDoFn}s, using threaded execution to - * process multiple elements concurrently within a bundle. - * - * <p>Note, that each Dataflow worker will already process multiple bundles - * concurrently and usage of this class is meant only for cases where processing - * elements from within a bundle is limited by blocking calls. - * - * <p>CPU intensive or IO intensive tasks are in general a poor fit for parallelization. - * This is because a limited resource that is already maximally utilized does not - * benefit from sub-division of work. The parallelization will increase the amount of time - * to process each element yet the throughput for processing will remain relatively the same. - * For example, if the local disk (an IO resource) has a maximum write rate of 10 MiB/s, - * and processing each element requires to write 20 MiBs to disk, then processing one element - * to disk will take 2 seconds. Yet processing 3 elements concurrently (each getting an equal - * share of the maximum write rate) will take at least 6 seconds to complete (there is additional - * overhead in the extra parallelization). - * - * <p>To parallelize a {@link OldDoFn} to 10 threads: - * <pre>{@code - * PCollection<T> data = ...; - * data.apply( - * IntraBundleParallelization.of(new MyDoFn()) - * .withMaxParallelism(10))); - * }</pre> - * - * <p>An uncaught exception from the wrapped {@link OldDoFn} will result in the exception - * being rethrown in later calls to {@link MultiThreadedIntraBundleProcessingDoFn#processElement} - * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}. - */ -public class IntraBundleParallelization { - /** - * Creates a {@link IntraBundleParallelization} {@link PTransform} for the given - * {@link OldDoFn} that processes elements using multiple threads. - * - * <p>Note that the specified {@code doFn} needs to be thread safe. - */ - public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) { - return new Unbound().of(doFn); - } - - /** - * Creates a {@link IntraBundleParallelization} {@link PTransform} with the specified - * maximum concurrency level. - */ - public static Unbound withMaxParallelism(int maxParallelism) { - return new Unbound().withMaxParallelism(maxParallelism); - } - - /** - * An incomplete {@code IntraBundleParallelization} transform, with unbound input/output types. - * - * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} must be - * invoked to specify the {@link OldDoFn} to invoke, which will also - * bind the input/output types of this {@code PTransform}. - */ - public static class Unbound { - private final int maxParallelism; - - Unbound() { - this(DEFAULT_MAX_PARALLELISM); - } - - Unbound(int maxParallelism) { - checkArgument(maxParallelism > 0, - "Expected parallelism factor greater than zero, received %s.", maxParallelism); - this.maxParallelism = maxParallelism; - } - - /** - * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified maximum concurrency level. - */ - public Unbound withMaxParallelism(int maxParallelism) { - return new Unbound(maxParallelism); - } - - /** - * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified {@link OldDoFn}. - * - * <p>Note that the specified {@code doFn} needs to be thread safe. - */ - public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) { - return new Bound<>(doFn, maxParallelism); - } - } - - /** - * A {@code PTransform} that, when applied to a {@code PCollection<InputT>}, - * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements, - * with all its outputs collected into an output - * {@code PCollection<OutputT>}. - * - * <p>Note that the specified {@code doFn} needs to be thread safe. - * - * @param <InputT> the type of the (main) input {@code PCollection} elements - * @param <OutputT> the type of the (main) output {@code PCollection} elements - */ - public static class Bound<InputT, OutputT> - extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { - private final OldDoFn<InputT, OutputT> doFn; - private final int maxParallelism; - - Bound(OldDoFn<InputT, OutputT> doFn, int maxParallelism) { - checkArgument(maxParallelism > 0, - "Expected parallelism factor greater than zero, received %s.", maxParallelism); - this.doFn = doFn; - this.maxParallelism = maxParallelism; - } - - /** - * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified maximum concurrency level. - */ - public Bound<InputT, OutputT> withMaxParallelism(int maxParallelism) { - return new Bound<>(doFn, maxParallelism); - } - - /** - * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one - * with the specified {@link OldDoFn}. - * - * <p>Note that the specified {@code doFn} needs to be thread safe. - */ - public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT> - of(OldDoFn<NewInputT, NewOutputT> doFn) { - return new Bound<>(doFn, maxParallelism); - } - - @Override - public PCollection<OutputT> apply(PCollection<? extends InputT> input) { - return input.apply( - ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism))); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .add(DisplayData.item("maxParallelism", maxParallelism) - .withLabel("Maximum Parallelism")) - .add(DisplayData.item("fn", doFn.getClass()) - .withLabel("Function")) - .include(doFn); - } - } - - /** - * A multi-threaded {@code OldDoFn} wrapper. - * - * @see IntraBundleParallelization#of(OldDoFn) - * - * @param <InputT> the type of the (main) input elements - * @param <OutputT> the type of the (main) output elements - */ - public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT> - extends OldDoFn<InputT, OutputT> { - - public MultiThreadedIntraBundleProcessingDoFn( - OldDoFn<InputT, OutputT> doFn, - int maxParallelism) { - checkArgument(maxParallelism > 0, - "Expected parallelism factor greater than zero, received %s.", maxParallelism); - this.doFn = doFn; - this.maxParallelism = maxParallelism; - } - - @Override - public void startBundle(Context c) throws Exception { - doFn.startBundle(c); - - executor = c.getPipelineOptions().as(GcsOptions.class).getExecutorService(); - workTickets = new Semaphore(maxParallelism); - failure = new AtomicReference<>(); - } - - @Override - public void processElement(final ProcessContext c) throws Exception { - try { - workTickets.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while scheduling work", e); - } - - if (failure.get() != null) { - throw new RuntimeException(failure.get()); - } - - executor.submit(new Runnable() { - @Override - public void run() { - try { - doFn.processElement(new WrappedContext(c)); - } catch (Throwable t) { - failure.compareAndSet(null, t); - Throwables.propagateIfPossible(t); - throw new AssertionError("Unexpected checked exception: " + t); - } finally { - workTickets.release(); - } - } - }); - } - - @Override - public void finishBundle(Context c) throws Exception { - // Acquire all the work tickets to guarantee that all the previous - // processElement calls have finished. - workTickets.acquire(maxParallelism); - if (failure.get() != null) { - throw new RuntimeException(failure.get()); - } - doFn.finishBundle(c); - } - - @Override - protected TypeDescriptor<InputT> getInputTypeDescriptor() { - return doFn.getInputTypeDescriptor(); - } - - @Override - protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { - return doFn.getOutputTypeDescriptor(); - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Wraps a OldDoFn context, forcing single-thread output so that threads don't - * propagate through to downstream functions. - */ - private class WrappedContext extends ProcessContext { - private final ProcessContext context; - - WrappedContext(ProcessContext context) { - this.context = context; - } - - @Override - public InputT element() { - return context.element(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return context.sideInput(view); - } - - @Override - public void output(OutputT output) { - synchronized (MultiThreadedIntraBundleProcessingDoFn.this) { - context.output(output); - } - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - synchronized (MultiThreadedIntraBundleProcessingDoFn.this) { - context.outputWithTimestamp(output, timestamp); - } - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - synchronized (MultiThreadedIntraBundleProcessingDoFn.this) { - context.sideOutput(tag, output); - } - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - synchronized (MultiThreadedIntraBundleProcessingDoFn.this) { - context.sideOutputWithTimestamp(tag, output, timestamp); - } - } - - @Override - public Instant timestamp() { - return context.timestamp(); - } - - @Override - public BoundedWindow window() { - return context.window(); - } - - @Override - public PaneInfo pane() { - return context.pane(); - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return context.windowingInternals(); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( - String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - } - - private final OldDoFn<InputT, OutputT> doFn; - private int maxParallelism; - - private transient ExecutorService executor; - private transient Semaphore workTickets; - private transient AtomicReference<Throwable> failure; - } - - /** - * Default maximum for number of concurrent elements to process. - */ - private static final int DEFAULT_MAX_PARALLELISM = 16; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java deleted file mode 100644 index b9afd35..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; -import static org.hamcrest.Matchers.both; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for RateLimiter. - */ -@RunWith(JUnit4.class) -public class IntraBundleParallelizationTest { - private static final int PARALLELISM_FACTOR = 16; - private static final AtomicInteger numSuccesses = new AtomicInteger(); - private static final AtomicInteger numProcessed = new AtomicInteger(); - private static final AtomicInteger numFailures = new AtomicInteger(); - private static int concurrentElements = 0; - private static int maxDownstreamConcurrency = 0; - - private static final AtomicInteger maxFnConcurrency = new AtomicInteger(); - private static final AtomicInteger currentFnConcurrency = new AtomicInteger(); - - @Before - public void setUp() { - numSuccesses.set(0); - numProcessed.set(0); - numFailures.set(0); - concurrentElements = 0; - maxDownstreamConcurrency = 0; - - maxFnConcurrency.set(0); - currentFnConcurrency.set(0); - } - - /** - * Introduces a delay in processing, then passes thru elements. - */ - private static class DelayFn<T> extends OldDoFn<T, T> { - public static final long DELAY_MS = 25; - - @Override - public void processElement(ProcessContext c) { - startConcurrentCall(); - try { - sleepMillis(DELAY_MS); - } catch (InterruptedException e) { - e.printStackTrace(); - throw new RuntimeException("Interrupted"); - } - c.output(c.element()); - finishConcurrentCall(); - } - } - - /** - * Throws an exception after some number of calls. - */ - private static class ExceptionThrowingFn<T> extends OldDoFn<T, T> { - private ExceptionThrowingFn(int numSuccesses) { - IntraBundleParallelizationTest.numSuccesses.set(numSuccesses); - } - - @Override - public void processElement(ProcessContext c) { - startConcurrentCall(); - try { - numProcessed.incrementAndGet(); - if (numSuccesses.decrementAndGet() >= 0) { - c.output(c.element()); - return; - } - - numFailures.incrementAndGet(); - throw new RuntimeException("Expected failure"); - } finally { - finishConcurrentCall(); - } - } - } - - /** - * Measures concurrency of the processElement method. - */ - private static class ConcurrencyMeasuringFn<T> extends OldDoFn<T, T> { - @Override - public void processElement(ProcessContext c) { - // Synchronize on the class to provide synchronous access irrespective of - // how this OldDoFn is called. - synchronized (ConcurrencyMeasuringFn.class) { - concurrentElements++; - if (concurrentElements > maxDownstreamConcurrency) { - maxDownstreamConcurrency = concurrentElements; - } - } - - c.output(c.element()); - - synchronized (ConcurrencyMeasuringFn.class) { - concurrentElements--; - } - } - } - - private static void startConcurrentCall() { - int currentlyExecuting = currentFnConcurrency.incrementAndGet(); - int maxConcurrency; - do { - maxConcurrency = maxFnConcurrency.get(); - } while (maxConcurrency < currentlyExecuting - && !maxFnConcurrency.compareAndSet(maxConcurrency, currentlyExecuting)); - } - - private static void finishConcurrentCall() { - currentFnConcurrency.decrementAndGet(); - } - - /** - * Test that the OldDoFn is parallelized up the the Max Parallelism factor within a bundle, but - * not greater than that amount. - */ - @Test - @Category(NeedsRunner.class) - public void testParallelization() { - int maxConcurrency = Integer.MIN_VALUE; - // Take the minimum from multiple runs. - for (int i = 0; i < 5; ++i) { - maxConcurrency = Math.max(maxConcurrency, - run(2 * PARALLELISM_FACTOR, PARALLELISM_FACTOR, new DelayFn<Integer>())); - } - - // We should run at least some elements in parallel on some run - assertThat(maxConcurrency, - greaterThanOrEqualTo(2)); - // No run should execute more elements concurrency than the maximum concurrency allowed. - assertThat(maxConcurrency, - lessThanOrEqualTo(PARALLELISM_FACTOR)); - } - - @Test(timeout = 5000L) - @Category(NeedsRunner.class) - public void testExceptionHandling() { - ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(10); - try { - run(100, PARALLELISM_FACTOR, fn); - fail("Expected exception to propagate"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), containsString("Expected failure")); - } - - // Should have processed 10 elements, but stopped before processing all - // of them. - assertThat(numProcessed.get(), - is(both(greaterThanOrEqualTo(10)) - .and(lessThan(100)))); - - // The first failure should prevent the scheduling of any more elements. - assertThat(numFailures.get(), - is(both(greaterThanOrEqualTo(1)) - .and(lessThanOrEqualTo(PARALLELISM_FACTOR)))); - } - - @Test(timeout = 5000L) - @Category(NeedsRunner.class) - public void testExceptionHandlingOnLastElement() { - ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(9); - try { - run(10, PARALLELISM_FACTOR, fn); - fail("Expected exception to propagate"); - } catch (RuntimeException e) { - assertThat(e.getMessage(), containsString("Expected failure")); - } - - // Should have processed 10 elements, but stopped before processing all - // of them. - assertEquals(10, numProcessed.get()); - assertEquals(1, numFailures.get()); - } - - @Test - public void testIntraBundleParallelizationGetName() { - assertEquals( - "IntraBundleParallelization", - IntraBundleParallelization.of(new DelayFn<Integer>()).withMaxParallelism(1).getName()); - } - - @Test - public void testDisplayData() { - OldDoFn<String, String> fn = new OldDoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.add(DisplayData.item("foo", "bar")); - } - }; - - PTransform<?, ?> transform = IntraBundleParallelization - .withMaxParallelism(1234) - .of(fn); - - DisplayData displayData = DisplayData.from(transform); - assertThat(displayData, includesDisplayDataFrom(fn)); - assertThat(displayData, hasDisplayItem("fn", fn.getClass())); - assertThat(displayData, hasDisplayItem("maxParallelism", 1234)); - } - - /** - * Runs the provided doFn inside of an {@link IntraBundleParallelization} transform. - * - * <p>This method assumes that the OldDoFn passed to it will call {@link #startConcurrentCall()} - * before processing each elements and {@link #finishConcurrentCall()} after each element. - * - * @param numElements the size of the input - * @param maxParallelism how many threads to execute in parallel - * @param doFn the OldDoFn to execute - * @return the maximum observed parallelism of the OldDoFn - */ - private int run(int numElements, int maxParallelism, OldDoFn<Integer, Integer> doFn) { - Pipeline pipeline = TestPipeline.create(); - - ArrayList<Integer> data = new ArrayList<>(numElements); - for (int i = 0; i < numElements; ++i) { - data.add(i); - } - - ConcurrencyMeasuringFn<Integer> downstream = new ConcurrencyMeasuringFn<>(); - pipeline - .apply(Create.of(data)) - .apply(IntraBundleParallelization.of(doFn).withMaxParallelism(maxParallelism)) - .apply(ParDo.of(downstream)); - - pipeline.run(); - - // All elements should have completed. - assertEquals(0, currentFnConcurrency.get()); - // Downstream methods should not see parallel threads. - assertEquals(1, maxDownstreamConcurrency); - - return maxFnConcurrency.get(); - } -}