Repository: incubator-beam
Updated Branches:
  refs/heads/master e9a08e454 -> 665457c9c


Remove PubsubFileInjector and IntraBundleParallelization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc964748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc964748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc964748

Branch: refs/heads/master
Commit: fc96474804cdcd950663e7eaea5c140652a07c8e
Parents: e9a08e4
Author: peihe <hepeim...@gmail.com>
Authored: Tue Sep 13 21:29:13 2016 -0700
Committer: peihe <hepeim...@gmail.com>
Committed: Tue Sep 13 21:30:05 2016 -0700

----------------------------------------------------------------------
 .../examples/common/PubsubFileInjector.java     | 153 --------
 .../transforms/IntraBundleParallelization.java  | 361 -------------------
 .../IntraBundleParallelizationTest.java         | 280 --------------
 3 files changed, 794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
deleted file mode 100644
index 4634159..0000000
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.IntraBundleParallelization;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.Transport;
-
-/**
- * A batch Dataflow pipeline for injecting a set of GCS files into
- * a PubSub topic line by line. Empty lines are skipped.
- *
- * <p>This is useful for testing streaming
- * pipelines. Note that since batch pipelines might retry chunks, this
- * does _not_ guarantee exactly-once injection of file data. Some lines may
- * be published multiple times.
- * </p>
- */
-public class PubsubFileInjector {
-
-  /**
-   * An incomplete {@code PubsubFileInjector} transform with unbound output 
topic.
-   */
-  public static class Unbound {
-    private final String timestampLabelKey;
-
-    Unbound() {
-      this.timestampLabelKey = null;
-    }
-
-    Unbound(String timestampLabelKey) {
-      this.timestampLabelKey = timestampLabelKey;
-    }
-
-    Unbound withTimestampLabelKey(String timestampLabelKey) {
-      return new Unbound(timestampLabelKey);
-    }
-
-    public Bound publish(String outputTopic) {
-      return new Bound(outputTopic, timestampLabelKey);
-    }
-  }
-
-  /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. 
*/
-  public static class Bound extends OldDoFn<String, Void> {
-    private final String outputTopic;
-    private final String timestampLabelKey;
-    public transient Pubsub pubsub;
-
-    public Bound(String outputTopic, String timestampLabelKey) {
-      this.outputTopic = outputTopic;
-      this.timestampLabelKey = timestampLabelKey;
-    }
-
-    @Override
-    public void startBundle(Context context) {
-      this.pubsub =
-          
Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class))
-              .build();
-    }
-
-    @Override
-    public void processElement(ProcessContext c) throws IOException {
-      if (c.element().isEmpty()) {
-        return;
-      }
-      PubsubMessage pubsubMessage = new PubsubMessage();
-      pubsubMessage.encodeData(c.element().getBytes());
-      if (timestampLabelKey != null) {
-        pubsubMessage.setAttributes(
-            ImmutableMap.of(timestampLabelKey, 
Long.toString(c.timestamp().getMillis())));
-      }
-      PublishRequest publishRequest = new PublishRequest();
-      publishRequest.setMessages(Arrays.asList(pubsubMessage));
-      this.pubsub.projects().topics().publish(outputTopic, 
publishRequest).execute();
-    }
-  }
-
-  /**
-   * Creates a {@code PubsubFileInjector} transform with the given timestamp 
label key.
-   */
-  public static Unbound withTimestampLabelKey(String timestampLabelKey) {
-    return new Unbound(timestampLabelKey);
-  }
-
-  /**
-   * Creates a {@code PubsubFileInjector} transform that publishes to the 
given output topic.
-   */
-  public static Bound publish(String outputTopic) {
-    return new Unbound().publish(outputTopic);
-  }
-
-  /**
-   * Command line parameter options.
-   */
-  private interface PubsubFileInjectorOptions extends PipelineOptions {
-    @Description("GCS location of files.")
-    @Validation.Required
-    String getInput();
-    void setInput(String value);
-
-    @Description("Topic to publish on.")
-    @Validation.Required
-    String getOutputTopic();
-    void setOutputTopic(String value);
-  }
-
-  /**
-   * Sets up and starts streaming pipeline.
-   */
-  public static void main(String[] args) {
-    PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args)
-        .withValidation()
-        .as(PubsubFileInjectorOptions.class);
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    pipeline
-        .apply(TextIO.Read.from(options.getInput()))
-        
.apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic()))
-            .withMaxParallelism(20));
-
-    pipeline.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
deleted file mode 100644
index 1eef0e1..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.base.Throwables;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.Instant;
-
-/**
- * Provides multi-threading of {@link OldDoFn}s, using threaded execution to
- * process multiple elements concurrently within a bundle.
- *
- * <p>Note, that each Dataflow worker will already process multiple bundles
- * concurrently and usage of this class is meant only for cases where 
processing
- * elements from within a bundle is limited by blocking calls.
- *
- * <p>CPU intensive or IO intensive tasks are in general a poor fit for 
parallelization.
- * This is because a limited resource that is already maximally utilized does 
not
- * benefit from sub-division of work. The parallelization will increase the 
amount of time
- * to process each element yet the throughput for processing will remain 
relatively the same.
- * For example, if the local disk (an IO resource) has a maximum write rate of 
10 MiB/s,
- * and processing each element requires to write 20 MiBs to disk, then 
processing one element
- * to disk will take 2 seconds. Yet processing 3 elements concurrently (each 
getting an equal
- * share of the maximum write rate) will take at least 6 seconds to complete 
(there is additional
- * overhead in the extra parallelization).
- *
- * <p>To parallelize a {@link OldDoFn} to 10 threads:
- * <pre>{@code
- * PCollection<T> data = ...;
- * data.apply(
- *   IntraBundleParallelization.of(new MyDoFn())
- *                             .withMaxParallelism(10)));
- * }</pre>
- *
- * <p>An uncaught exception from the wrapped {@link OldDoFn} will result in 
the exception
- * being rethrown in later calls to {@link 
MultiThreadedIntraBundleProcessingDoFn#processElement}
- * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}.
- */
-public class IntraBundleParallelization {
-  /**
-   * Creates a {@link IntraBundleParallelization} {@link PTransform} for the 
given
-   * {@link OldDoFn} that processes elements using multiple threads.
-   *
-   * <p>Note that the specified {@code doFn} needs to be thread safe.
-   */
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, 
OutputT> doFn) {
-    return new Unbound().of(doFn);
-  }
-
-  /**
-   * Creates a {@link IntraBundleParallelization} {@link PTransform} with the 
specified
-   * maximum concurrency level.
-   */
-  public static Unbound withMaxParallelism(int maxParallelism) {
-    return new Unbound().withMaxParallelism(maxParallelism);
-  }
-
-  /**
-   * An incomplete {@code IntraBundleParallelization} transform, with unbound 
input/output types.
-   *
-   * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} 
must be
-   * invoked to specify the {@link OldDoFn} to invoke, which will also
-   * bind the input/output types of this {@code PTransform}.
-   */
-  public static class Unbound {
-    private final int maxParallelism;
-
-    Unbound() {
-      this(DEFAULT_MAX_PARALLELISM);
-    }
-
-    Unbound(int maxParallelism) {
-      checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", 
maxParallelism);
-      this.maxParallelism = maxParallelism;
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} 
like this one
-     * with the specified maximum concurrency level.
-     */
-    public Unbound withMaxParallelism(int maxParallelism) {
-      return new Unbound(maxParallelism);
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} 
like this one
-     * with the specified {@link OldDoFn}.
-     *
-     * <p>Note that the specified {@code doFn} needs to be thread safe.
-     */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, 
OutputT> doFn) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-  }
-
-  /**
-   * A {@code PTransform} that, when applied to a {@code PCollection<InputT>},
-   * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its 
elements,
-   * with all its outputs collected into an output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>Note that the specified {@code doFn} needs to be thread safe.
-   *
-   * @param <InputT> the type of the (main) input {@code PCollection} elements
-   * @param <OutputT> the type of the (main) output {@code PCollection} 
elements
-   */
-  public static class Bound<InputT, OutputT>
-      extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-    private final OldDoFn<InputT, OutputT> doFn;
-    private final int maxParallelism;
-
-    Bound(OldDoFn<InputT, OutputT> doFn, int maxParallelism) {
-      checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", 
maxParallelism);
-      this.doFn = doFn;
-      this.maxParallelism = maxParallelism;
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} 
like this one
-     * with the specified maximum concurrency level.
-     */
-    public Bound<InputT, OutputT> withMaxParallelism(int maxParallelism) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-
-    /**
-     * Returns a new {@link IntraBundleParallelization} {@link PTransform} 
like this one
-     * with the specified {@link OldDoFn}.
-     *
-     * <p>Note that the specified {@code doFn} needs to be thread safe.
-     */
-    public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT>
-        of(OldDoFn<NewInputT, NewOutputT> doFn) {
-      return new Bound<>(doFn, maxParallelism);
-    }
-
-    @Override
-    public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
-      return input.apply(
-          ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, 
maxParallelism)));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .add(DisplayData.item("maxParallelism", maxParallelism)
-            .withLabel("Maximum Parallelism"))
-          .add(DisplayData.item("fn", doFn.getClass())
-            .withLabel("Function"))
-          .include(doFn);
-    }
-  }
-
-  /**
-   * A multi-threaded {@code OldDoFn} wrapper.
-   *
-   * @see IntraBundleParallelization#of(OldDoFn)
-   *
-   * @param <InputT> the type of the (main) input elements
-   * @param <OutputT> the type of the (main) output elements
-   */
-  public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>
-      extends OldDoFn<InputT, OutputT> {
-
-    public MultiThreadedIntraBundleProcessingDoFn(
-        OldDoFn<InputT, OutputT> doFn,
-        int maxParallelism) {
-      checkArgument(maxParallelism > 0,
-          "Expected parallelism factor greater than zero, received %s.", 
maxParallelism);
-      this.doFn = doFn;
-      this.maxParallelism = maxParallelism;
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      doFn.startBundle(c);
-
-      executor = 
c.getPipelineOptions().as(GcsOptions.class).getExecutorService();
-      workTickets = new Semaphore(maxParallelism);
-      failure = new AtomicReference<>();
-    }
-
-    @Override
-    public void processElement(final ProcessContext c) throws Exception {
-      try {
-        workTickets.acquire();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException("Interrupted while scheduling work", e);
-      }
-
-      if (failure.get() != null) {
-        throw new RuntimeException(failure.get());
-      }
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            doFn.processElement(new WrappedContext(c));
-          } catch (Throwable t) {
-            failure.compareAndSet(null, t);
-            Throwables.propagateIfPossible(t);
-            throw new AssertionError("Unexpected checked exception: " + t);
-          } finally {
-            workTickets.release();
-          }
-        }
-      });
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      // Acquire all the work tickets to guarantee that all the previous
-      // processElement calls have finished.
-      workTickets.acquire(maxParallelism);
-      if (failure.get() != null) {
-        throw new RuntimeException(failure.get());
-      }
-      doFn.finishBundle(c);
-    }
-
-    @Override
-    protected TypeDescriptor<InputT> getInputTypeDescriptor() {
-      return doFn.getInputTypeDescriptor();
-    }
-
-    @Override
-    protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
-      return doFn.getOutputTypeDescriptor();
-    }
-
-    
/////////////////////////////////////////////////////////////////////////////
-
-    /**
-     * Wraps a OldDoFn context, forcing single-thread output so that threads 
don't
-     * propagate through to downstream functions.
-     */
-    private class WrappedContext extends ProcessContext {
-      private final ProcessContext context;
-
-      WrappedContext(ProcessContext context) {
-        this.context = context;
-      }
-
-      @Override
-      public InputT element() {
-        return context.element();
-      }
-
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return context.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return context.sideInput(view);
-      }
-
-      @Override
-      public void output(OutputT output) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.output(output);
-        }
-      }
-
-      @Override
-      public void outputWithTimestamp(OutputT output, Instant timestamp) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.outputWithTimestamp(output, timestamp);
-        }
-      }
-
-      @Override
-      public <T> void sideOutput(TupleTag<T> tag, T output) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.sideOutput(tag, output);
-        }
-      }
-
-      @Override
-      public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, 
Instant timestamp) {
-        synchronized (MultiThreadedIntraBundleProcessingDoFn.this) {
-          context.sideOutputWithTimestamp(tag, output, timestamp);
-        }
-      }
-
-      @Override
-      public Instant timestamp() {
-        return context.timestamp();
-      }
-
-      @Override
-      public BoundedWindow window() {
-        return context.window();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return context.pane();
-      }
-
-      @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        return context.windowingInternals();
-      }
-
-      @Override
-      protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(
-          String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
-        return context.createAggregatorInternal(name, combiner);
-      }
-    }
-
-    private final OldDoFn<InputT, OutputT> doFn;
-    private int maxParallelism;
-
-    private transient ExecutorService executor;
-    private transient Semaphore workTickets;
-    private transient AtomicReference<Throwable> failure;
-  }
-
-  /**
-   * Default maximum for number of concurrent elements to process.
-   */
-  private static final int DEFAULT_MAX_PARALLELISM = 16;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc964748/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
deleted file mode 100644
index b9afd35..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for RateLimiter.
- */
-@RunWith(JUnit4.class)
-public class IntraBundleParallelizationTest {
-  private static final int PARALLELISM_FACTOR = 16;
-  private static final AtomicInteger numSuccesses = new AtomicInteger();
-  private static final AtomicInteger numProcessed = new AtomicInteger();
-  private static final AtomicInteger numFailures = new AtomicInteger();
-  private static int concurrentElements = 0;
-  private static int maxDownstreamConcurrency = 0;
-
-  private static final AtomicInteger maxFnConcurrency = new AtomicInteger();
-  private static final AtomicInteger currentFnConcurrency = new 
AtomicInteger();
-
-  @Before
-  public void setUp() {
-    numSuccesses.set(0);
-    numProcessed.set(0);
-    numFailures.set(0);
-    concurrentElements = 0;
-    maxDownstreamConcurrency = 0;
-
-    maxFnConcurrency.set(0);
-    currentFnConcurrency.set(0);
-  }
-
-  /**
-   * Introduces a delay in processing, then passes thru elements.
-   */
-  private static class DelayFn<T> extends OldDoFn<T, T> {
-    public static final long DELAY_MS = 25;
-
-    @Override
-    public void processElement(ProcessContext c) {
-      startConcurrentCall();
-      try {
-        sleepMillis(DELAY_MS);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        throw new RuntimeException("Interrupted");
-      }
-      c.output(c.element());
-      finishConcurrentCall();
-    }
-  }
-
-  /**
-   * Throws an exception after some number of calls.
-   */
-  private static class ExceptionThrowingFn<T> extends OldDoFn<T, T> {
-    private ExceptionThrowingFn(int numSuccesses) {
-      IntraBundleParallelizationTest.numSuccesses.set(numSuccesses);
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      startConcurrentCall();
-      try {
-        numProcessed.incrementAndGet();
-        if (numSuccesses.decrementAndGet() >= 0) {
-          c.output(c.element());
-          return;
-        }
-
-        numFailures.incrementAndGet();
-        throw new RuntimeException("Expected failure");
-      } finally {
-        finishConcurrentCall();
-      }
-    }
-  }
-
-  /**
-   * Measures concurrency of the processElement method.
-   */
-  private static class ConcurrencyMeasuringFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(ProcessContext c) {
-      // Synchronize on the class to provide synchronous access irrespective of
-      // how this OldDoFn is called.
-      synchronized (ConcurrencyMeasuringFn.class) {
-        concurrentElements++;
-        if (concurrentElements > maxDownstreamConcurrency) {
-          maxDownstreamConcurrency = concurrentElements;
-        }
-      }
-
-      c.output(c.element());
-
-      synchronized (ConcurrencyMeasuringFn.class) {
-        concurrentElements--;
-      }
-    }
-  }
-
-  private static void startConcurrentCall() {
-    int currentlyExecuting = currentFnConcurrency.incrementAndGet();
-    int maxConcurrency;
-    do {
-      maxConcurrency = maxFnConcurrency.get();
-    } while (maxConcurrency < currentlyExecuting
-        && !maxFnConcurrency.compareAndSet(maxConcurrency, 
currentlyExecuting));
-  }
-
-  private static void finishConcurrentCall() {
-    currentFnConcurrency.decrementAndGet();
-  }
-
-  /**
-   * Test that the OldDoFn is parallelized up the the Max Parallelism factor 
within a bundle, but
-   * not greater than that amount.
-   */
-  @Test
-  @Category(NeedsRunner.class)
-  public void testParallelization() {
-    int maxConcurrency = Integer.MIN_VALUE;
-    // Take the minimum from multiple runs.
-    for (int i = 0; i < 5; ++i) {
-      maxConcurrency = Math.max(maxConcurrency,
-          run(2 * PARALLELISM_FACTOR, PARALLELISM_FACTOR, new 
DelayFn<Integer>()));
-    }
-
-    // We should run at least some elements in parallel on some run
-    assertThat(maxConcurrency,
-        greaterThanOrEqualTo(2));
-    // No run should execute more elements concurrency than the maximum 
concurrency allowed.
-    assertThat(maxConcurrency,
-        lessThanOrEqualTo(PARALLELISM_FACTOR));
-  }
-
-  @Test(timeout = 5000L)
-  @Category(NeedsRunner.class)
-  public void testExceptionHandling() {
-    ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(10);
-    try {
-      run(100, PARALLELISM_FACTOR, fn);
-      fail("Expected exception to propagate");
-    } catch (RuntimeException e) {
-      assertThat(e.getMessage(), containsString("Expected failure"));
-    }
-
-    // Should have processed 10 elements, but stopped before processing all
-    // of them.
-    assertThat(numProcessed.get(),
-        is(both(greaterThanOrEqualTo(10))
-            .and(lessThan(100))));
-
-    // The first failure should prevent the scheduling of any more elements.
-    assertThat(numFailures.get(),
-        is(both(greaterThanOrEqualTo(1))
-            .and(lessThanOrEqualTo(PARALLELISM_FACTOR))));
-  }
-
-  @Test(timeout = 5000L)
-  @Category(NeedsRunner.class)
-  public void testExceptionHandlingOnLastElement() {
-    ExceptionThrowingFn<Integer> fn = new ExceptionThrowingFn<>(9);
-    try {
-      run(10, PARALLELISM_FACTOR, fn);
-      fail("Expected exception to propagate");
-    } catch (RuntimeException e) {
-      assertThat(e.getMessage(), containsString("Expected failure"));
-    }
-
-    // Should have processed 10 elements, but stopped before processing all
-    // of them.
-    assertEquals(10, numProcessed.get());
-    assertEquals(1, numFailures.get());
-  }
-
-  @Test
-  public void testIntraBundleParallelizationGetName() {
-    assertEquals(
-        "IntraBundleParallelization",
-        IntraBundleParallelization.of(new 
DelayFn<Integer>()).withMaxParallelism(1).getName());
-  }
-
-  @Test
-  public void testDisplayData() {
-    OldDoFn<String, String> fn = new OldDoFn<String, String>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.add(DisplayData.item("foo", "bar"));
-      }
-    };
-
-    PTransform<?, ?> transform = IntraBundleParallelization
-        .withMaxParallelism(1234)
-        .of(fn);
-
-    DisplayData displayData = DisplayData.from(transform);
-    assertThat(displayData, includesDisplayDataFrom(fn));
-    assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
-    assertThat(displayData, hasDisplayItem("maxParallelism", 1234));
-  }
-
-  /**
-   * Runs the provided doFn inside of an {@link IntraBundleParallelization} 
transform.
-   *
-   * <p>This method assumes that the OldDoFn passed to it will call {@link 
#startConcurrentCall()}
-   * before processing each elements and {@link #finishConcurrentCall()} after 
each element.
-   *
-   * @param numElements the size of the input
-   * @param maxParallelism how many threads to execute in parallel
-   * @param doFn the OldDoFn to execute
-   * @return the maximum observed parallelism of the OldDoFn
-   */
-  private int run(int numElements, int maxParallelism, OldDoFn<Integer, 
Integer> doFn) {
-    Pipeline pipeline = TestPipeline.create();
-
-    ArrayList<Integer> data = new ArrayList<>(numElements);
-    for (int i = 0; i < numElements; ++i) {
-      data.add(i);
-    }
-
-    ConcurrencyMeasuringFn<Integer> downstream = new 
ConcurrencyMeasuringFn<>();
-    pipeline
-        .apply(Create.of(data))
-        
.apply(IntraBundleParallelization.of(doFn).withMaxParallelism(maxParallelism))
-        .apply(ParDo.of(downstream));
-
-    pipeline.run();
-
-    // All elements should have completed.
-    assertEquals(0, currentFnConcurrency.get());
-    // Downstream methods should not see parallel threads.
-    assertEquals(1, maxDownstreamConcurrency);
-
-    return maxFnConcurrency.get();
-  }
-}

Reply via email to