[ 
https://issues.apache.org/jira/browse/BEAM-4432?focusedWorklogId=123414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123414
 ]

ASF GitHub Bot logged work on BEAM-4432:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Jul/18 14:56
            Start Date: 15/Jul/18 14:56
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #5519: [BEAM-4432] Adding 
Sources to produce Synthetic output for Batch pipelines
URL: https://github.com/apache/beam/pull/5519
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 49147654ba9..3b7ce252161 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -341,6 +341,7 @@ class BeamModulePlugin implements Plugin<Project> {
         commons_io_1x                               : 
"commons-io:commons-io:1.3.2",
         commons_io_2x                               : 
"commons-io:commons-io:2.5",
         commons_lang3                               : 
"org.apache.commons:commons-lang3:3.6",
+        commons_math3                               : 
"org.apache.commons:commons-math3:3.6.1",
         datastore_v1_proto_client                   : 
"com.google.cloud.datastore:datastore-v1-proto-client:1.4.0",
         datastore_v1_protos                         : 
"com.google.cloud.datastore:datastore-v1-protos:1.3.0",
         error_prone_annotations                     : 
"com.google.errorprone:error_prone_annotations:2.0.15",
diff --git a/sdks/java/io/synthetic/build.gradle 
b/sdks/java/io/synthetic/build.gradle
new file mode 100644
index 00000000000..3d6be8eb13c
--- /dev/null
+++ b/sdks/java/io/synthetic/build.gradle
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Synthetic"
+ext.summary = "Generators of Synthetic IO for Testing."
+
+dependencies {
+  compile library.java.joda_time
+  compile library.java.commons_math3
+  shadow library.java.jackson_core
+  shadow library.java.jackson_annotations
+  shadow library.java.jackson_databind
+  testCompile library.java.guava
+  testCompile library.java.junit
+  testCompile library.java.hamcrest_core
+  testCompile library.java.hamcrest_library
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+}
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
new file mode 100644
index 00000000000..d9f652ace9d
--- /dev/null
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.math3.stat.StatUtils.sum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link SyntheticBoundedIO} class provides a parameterizable batch 
custom source that is
+ * deterministic.
+ *
+ * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of 
{@code KV<byte[],
+ * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} 
are associated with
+ * "hot" keys, which are uniformly distributed over a fixed number of hot 
keys. The remaining
+ * generated records are associated with "random" keys. Each record will be 
slowed down by a certain
+ * sleep time generated based on the specified sleep time distribution when 
the {@link
+ * SyntheticSourceReader} reads each record. The record {@code KV<byte[], 
byte[]>} is generated
+ * deterministically based on the record's position in the source, which 
enables repeatable
+ * execution for debugging. The SyntheticBoundedInput configurable parameters 
are defined in {@link
+ * SyntheticBoundedIO.SyntheticSourceOptions}.
+ *
+ * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link 
SyntheticBoundedIO},
+ * use {@link SyntheticBoundedIO#readFrom} to construct the synthetic source 
with synthetic source
+ * options. See {@link SyntheticBoundedIO.SyntheticSourceOptions} for how to 
construct an instance.
+ * An example is below:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ * SyntheticBoundedInput.SourceOptions sso = ...;
+ *
+ * // Construct the synthetic input with synthetic source options.
+ * PCollection<KV<byte[], byte[]>> input = 
p.apply(SyntheticBoundedInput.readFrom(sso));
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class SyntheticBoundedIO {
+  /** Read from the synthetic source options. */
+  public static Read.Bounded<KV<byte[], byte[]>> 
readFrom(SyntheticSourceOptions options) {
+    checkNotNull(options, "Input synthetic source options should not be 
null.");
+    return Read.from(new SyntheticBoundedSource(options));
+  }
+
+  /** A {@link SyntheticBoundedSource} that reads {@code KV<byte[], byte[]>}. 
*/
+  public static class SyntheticBoundedSource extends 
OffsetBasedSource<KV<byte[], byte[]>> {
+    private static final long serialVersionUID = 0;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SyntheticBoundedSource.class);
+
+    private final SyntheticSourceOptions sourceOptions;
+
+    public SyntheticBoundedSource(SyntheticSourceOptions sourceOptions) {
+      this(0, sourceOptions.numRecords, sourceOptions);
+    }
+
+    SyntheticBoundedSource(long startOffset, long endOffset, 
SyntheticSourceOptions sourceOptions) {
+      super(startOffset, endOffset, 1);
+      this.sourceOptions = sourceOptions;
+      LOG.debug("Constructing {}", toString());
+    }
+
+    @Override
+    public Coder<KV<byte[], byte[]>> getDefaultOutputCoder() {
+      return KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());
+    }
+
+    @Override
+    // TODO: test cases where the source size could not be estimated (i.e., 
return 0).
+    // TODO: test cases where the key size and value size might differ from 
record to record.
+    // The key size and value size might have their own distributions.
+    public long getBytesPerOffset() {
+      return sourceOptions.bytesPerRecord >= 0
+          ? sourceOptions.bytesPerRecord
+          : sourceOptions.keySizeBytes + sourceOptions.valueSizeBytes;
+    }
+
+    @Override
+    public void validate() {
+      super.validate();
+      sourceOptions.validate();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("options", sourceOptions)
+          .add("offsetRange", "[" + getStartOffset() + ", " + getEndOffset() + 
")")
+          .toString();
+    }
+
+    @Override
+    public final SyntheticBoundedSource createSourceForSubrange(long start, 
long end) {
+      checkArgument(
+          start >= getStartOffset(),
+          "Start offset value "
+              + start
+              + " of the subrange cannot be smaller than the start offset 
value "
+              + getStartOffset()
+              + " of the parent source");
+      checkArgument(
+          end <= getEndOffset(),
+          "End offset value "
+              + end
+              + " of the subrange cannot be larger than the end offset value "
+              + getEndOffset()
+              + " of the parent source");
+
+      return new SyntheticBoundedSource(start, end, sourceOptions);
+    }
+
+    @Override
+    public long getMaxEndOffset(PipelineOptions options) {
+      return getEndOffset();
+    }
+
+    @Override
+    public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) 
{
+      return new SyntheticSourceReader(this);
+    }
+
+    @Override
+    public List<SyntheticBoundedSource> split(long desiredBundleSizeBytes, 
PipelineOptions options)
+        throws Exception {
+      // Choose number of bundles either based on explicit parameter,
+      // or based on size and hints.
+      int desiredNumBundles =
+          (sourceOptions.forceNumInitialBundles == null)
+              ? ((int) Math.ceil(1.0 * getEstimatedSizeBytes(options) / 
desiredBundleSizeBytes))
+              : sourceOptions.forceNumInitialBundles;
+
+      List<SyntheticBoundedSource> res =
+          generateBundleSizes(desiredNumBundles)
+              .stream()
+              .map(
+                  offsetRange ->
+                      createSourceForSubrange(offsetRange.getFrom(), 
offsetRange.getTo()))
+              .collect(Collectors.toList());
+      LOG.info("Split into {} bundles of sizes: {}", res.size(), res);
+      return res;
+    }
+
+    private List<OffsetRange> generateBundleSizes(int desiredNumBundles) {
+      List<OffsetRange> result = new ArrayList<>();
+
+      // Generate relative bundle sizes using the given distribution.
+      double[] relativeSizes = new double[desiredNumBundles];
+      for (int i = 0; i < relativeSizes.length; ++i) {
+        relativeSizes[i] =
+            sourceOptions.bundleSizeDistribution.sample(
+                sourceOptions.hashFunction.hashInt(i).asLong());
+      }
+
+      // Generate offset ranges proportional to the relative sizes.
+      double s = sum(relativeSizes);
+      long startOffset = getStartOffset();
+      double sizeSoFar = 0;
+      for (int i = 0; i < relativeSizes.length; ++i) {
+        sizeSoFar += relativeSizes[i];
+        long endOffset =
+            (i == relativeSizes.length - 1)
+                ? getEndOffset()
+                : (long) (getStartOffset() + sizeSoFar * (getEndOffset() - 
getStartOffset()) / s);
+        if (startOffset != endOffset) {
+          result.add(new OffsetRange(startOffset, endOffset));
+        }
+        startOffset = endOffset;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Shape of the progress reporting curve as a function of the current offset 
in the {@link
+   * SyntheticBoundedSource}.
+   */
+  public enum ProgressShape {
+    /** Reported progress grows linearly from 0 to 1. */
+    LINEAR,
+    /** Reported progress decreases linearly from 0.9 to 0.1. */
+    LINEAR_REGRESSING,
+  }
+
+  /**
+   * Synthetic bounded source options. These options are all JSON, see 
documentations of individual
+   * fields for details. {@code SyntheticSourceOptions} uses jackson 
annotations which
+   * PipelineOptionsFactory can use to parse and construct an instance.
+   */
+  public static class SyntheticSourceOptions extends SyntheticOptions {
+    private static final long serialVersionUID = 0;
+
+    /** Total number of generated records. */
+    @JsonProperty public long numRecords;
+
+    /**
+     * Only records whose index is a multiple of this will be split points. 0 
means the source is
+     * not dynamically splittable (but is perfectly statically splittable). In 
that case it also
+     * doesn't report progress at all.
+     */
+    @JsonProperty public long splitPointFrequencyRecords = 1;
+
+    /**
+     * Distribution for generating initial split bundles.
+     *
+     * <p>When splitting into "desiredBundleSizeBytes", we'll compute the 
desired number of bundles
+     * N, then sample this many numbers from this distribution, normalize 
their sum to 1, and use
+     * that as the boundaries of generated bundles.
+     *
+     * <p>The Zipf distribution is expected to be particularly useful here.
+     *
+     * <p>E.g., empirically, with 100 bundles, the Zipf distribution with a 
parameter of 3.5 will
+     * generate bundles where the largest is about 3x-10x larger than the 
median; with a parameter
+     * of 3.0 this ratio will be about 5x-50x; with 2.5, 5x-100x (i.e. 1 
bundle can be as large as
+     * all others combined).
+     */
+    @JsonDeserialize(using = SamplerDeserializer.class)
+    public Sampler bundleSizeDistribution = fromRealDistribution(new 
ConstantRealDistribution(1));
+
+    /**
+     * If specified, this source will split into exactly this many bundles 
regardless of the hints
+     * provided by the service.
+     */
+    @JsonProperty public Integer forceNumInitialBundles;
+
+    /** See {@link ProgressShape}. */
+    @JsonProperty public ProgressShape progressShape = ProgressShape.LINEAR;
+
+    /**
+     * The distribution for the delay when reading from synthetic source 
starts. This delay is
+     * independent of the per-record delay and uses the same types of 
distributions as {@link
+     * #delayDistribution}.
+     */
+    @JsonDeserialize(using = SamplerDeserializer.class)
+    final Sampler initializeDelayDistribution =
+        fromRealDistribution(new ConstantRealDistribution(0));
+
+    /**
+     * Generates a random delay value for the synthetic source initialization 
using the distribution
+     * defined by {@link #initializeDelayDistribution}.
+     */
+    Duration nextInitializeDelay(long seed) {
+      return Duration.millis((long) initializeDelayDistribution.sample(seed));
+    }
+
+    @Override
+    public void validate() {
+      super.validate();
+      checkArgument(
+          numRecords >= 0, "numRecords should be a non-negative number, but 
found %s.", numRecords);
+      checkNotNull(bundleSizeDistribution, "bundleSizeDistribution");
+      checkArgument(
+          forceNumInitialBundles == null || forceNumInitialBundles > 0,
+          "forceNumInitialBundles, if specified, must be positive, but found 
%s",
+          forceNumInitialBundles);
+      checkArgument(
+          splitPointFrequencyRecords >= 0,
+          "splitPointFrequencyRecords must be non-negative, but found %s",
+          splitPointFrequencyRecords);
+    }
+
+    public Record genRecord(long position) {
+      // This method is supposed to generate random records deterministically,
+      // so that results can be reproduced by running the same scenario a 
second time.
+      // We need to initiate a Random object for each position to make the 
record deterministic
+      // because liquid sharding could split the Source at any position.
+      // And we also need a seed to initiate a Random object. The mapping from 
the position to
+      // the seed should be fixed. Using the position as seed to feed Random 
objects will cause the
+      // generated values to not be random enough because the position values 
are
+      // close to each other. To make seeds fed into the Random objects 
unrelated,
+      // we use a hashing function to map the position to its corresponding 
hashcode,
+      // and use the hashcode as a seed to feed into the Random object.
+      long hashCodeOfPosition = hashFunction.hashLong(position).asLong();
+      return new Record(genKvPair(hashCodeOfPosition), 
nextDelay(hashCodeOfPosition));
+    }
+
+    /** Record generated by {@link #genRecord}. */
+    public static class Record {
+      public final KV<byte[], byte[]> kv;
+      public final Duration sleepMsec;
+
+      Record(KV<byte[], byte[]> kv, long sleepMsec) {
+        this.kv = kv;
+        this.sleepMsec = new Duration(sleepMsec);
+      }
+    }
+  }
+
+  /**
+   * A reader over the {@link PCollection} of {@code KV<byte[], byte[]>} from 
the synthetic source.
+   *
+   * <p>The random but deterministic record at position "i" in the range [A, 
B) is generated by
+   * using {@link SyntheticSourceOptions#genRecord}. Reading each record 
sleeps according to the
+   * sleep time distribution in {@code SyntheticOptions}.
+   */
+  private static class SyntheticSourceReader
+      extends OffsetBasedSource.OffsetBasedReader<KV<byte[], byte[]>> {
+    private final long splitPointFrequencyRecords;
+
+    private KV<byte[], byte[]> currentKvPair;
+    private long currentOffset;
+    private boolean isAtSplitPoint;
+
+    SyntheticSourceReader(SyntheticBoundedSource source) {
+      super(source);
+      this.currentKvPair = null;
+      this.splitPointFrequencyRecords = 
source.sourceOptions.splitPointFrequencyRecords;
+    }
+
+    @Override
+    public synchronized SyntheticBoundedSource getCurrentSource() {
+      return (SyntheticBoundedSource) super.getCurrentSource();
+    }
+
+    @Override
+    protected long getCurrentOffset() throws IllegalStateException {
+      return currentOffset;
+    }
+
+    @Override
+    public KV<byte[], byte[]> getCurrent() throws NoSuchElementException {
+      if (currentKvPair == null) {
+        throw new NoSuchElementException(
+            "The current element is unavailable because either the reader is "
+                + "at the beginning of the input and start() or advance() 
wasn't called, "
+                + "or the last start() or advance() returned false.");
+      }
+      return currentKvPair;
+    }
+
+    @Override
+    public boolean allowsDynamicSplitting() {
+      return splitPointFrequencyRecords > 0;
+    }
+
+    @Override
+    protected final boolean startImpl() throws IOException {
+      this.currentOffset = getCurrentSource().getStartOffset();
+      if (splitPointFrequencyRecords > 0) {
+        while (currentOffset % splitPointFrequencyRecords != 0) {
+          ++currentOffset;
+        }
+      }
+
+      SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+      SyntheticUtils.delay(
+          options.nextInitializeDelay(this.currentOffset),
+          options.cpuUtilizationInMixedDelay,
+          options.delayType,
+          new Random(this.currentOffset));
+
+      isAtSplitPoint = true;
+      --currentOffset;
+      return advanceImpl();
+    }
+
+    @Override
+    protected boolean advanceImpl() {
+      currentOffset++;
+      isAtSplitPoint =
+          (splitPointFrequencyRecords == 0) || (currentOffset % 
splitPointFrequencyRecords == 0);
+
+      SyntheticSourceOptions options = getCurrentSource().sourceOptions;
+      SyntheticSourceOptions.Record record = options.genRecord(currentOffset);
+      currentKvPair = record.kv;
+      // TODO: add a separate distribution for the sleep time of reading the 
first record
+      // (e.g.,"open" the files).
+      long hashCodeOfVal = 
options.hashFunction.hashBytes(currentKvPair.getValue()).asLong();
+      Random random = new Random(hashCodeOfVal);
+      SyntheticUtils.delay(
+          record.sleepMsec, options.cpuUtilizationInMixedDelay, 
options.delayType, random);
+
+      return true;
+    }
+
+    @Override
+    public void close() {
+      // Nothing
+    }
+
+    @Override
+    public Double getFractionConsumed() {
+      double realFractionConsumed = super.getFractionConsumed();
+      ProgressShape shape = getCurrentSource().sourceOptions.progressShape;
+      switch (shape) {
+        case LINEAR:
+          return realFractionConsumed;
+        case LINEAR_REGRESSING:
+          return 0.9 - 0.8 * realFractionConsumed;
+        default:
+          throw new AssertionError("Unexpected progress shape: " + shape);
+      }
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      return isAtSplitPoint;
+    }
+  }
+}
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
new file mode 100644
index 00000000000..b74fc068e40
--- /dev/null
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ExponentialDistribution;
+import org.apache.commons.math3.distribution.IntegerDistribution;
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.distribution.RealDistribution;
+import org.apache.commons.math3.distribution.UniformRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+
+/**
+ * This {@link SyntheticOptions} class provides common parameterizable 
synthetic options that are
+ * used by {@link SyntheticBoundedIO}.
+ */
+public class SyntheticOptions implements Serializable {
+  private static final long serialVersionUID = 0;
+
+  /**
+   * The type of Delay that will be produced.
+   *
+   * <p>CPU delay produces a CPU-busy delay. SLEEP delay makes the process 
sleep.
+   */
+  public enum DelayType {
+    SLEEP,
+    CPU,
+    MIXED,
+  }
+
+  /** Mapper for (de)serializing JSON. */
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Wrapper over a distribution. Unfortunately commons-math does not provide 
a common interface
+   * over both RealDistribution and IntegerDistribution, and we sometimes need 
one and sometimes the
+   * other.
+   */
+  public interface Sampler extends Serializable {
+    double sample(long seed);
+
+    // Make this class a bean, so Jackson can serialize it during 
SyntheticOptions.toString().
+    Object getDistribution();
+  }
+
+  public static Sampler fromRealDistribution(final RealDistribution dist) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        dist.reseedRandomGenerator(seed);
+        return dist.sample();
+      }
+
+      @Override
+      public Object getDistribution() {
+        return dist;
+      }
+    };
+  }
+
+  public static Sampler fromIntegerDistribution(final IntegerDistribution 
dist) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        dist.reseedRandomGenerator(seed);
+        return dist.sample();
+      }
+
+      @Override
+      public Object getDistribution() {
+        return dist;
+      }
+    };
+  }
+
+  private static Sampler scaledSampler(final Sampler sampler, final double 
multiplier) {
+    return new Sampler() {
+      private static final long serialVersionUID = 0L;
+
+      @Override
+      public double sample(long seed) {
+        return multiplier * sampler.sample(seed);
+      }
+
+      @Override
+      public Object getDistribution() {
+        return sampler.getDistribution();
+      }
+    };
+  }
+
+  /** The key size in bytes. */
+  @JsonProperty public long keySizeBytes = 1;
+
+  /** The value size in bytes. */
+  @JsonProperty public long valueSizeBytes = 1;
+
+  /**
+   * The size of a single record used for size estimation in bytes. If less 
than zero, keySizeBytes
+   * + valueSizeBytes is used.
+   */
+  @JsonProperty public final long bytesPerRecord;
+
+  /** The number of distinct "hot" keys. */
+  @JsonProperty public long numHotKeys;
+
+  /**
+   * The fraction of records associated with "hot" keys, which are uniformly 
distributed over a
+   * fixed number of hot keys.
+   */
+  @JsonProperty public double hotKeyFraction;
+
+  /** The fraction of keys that should be larger than others. */
+  @JsonProperty public double largeKeyFraction = 0.0;
+
+  /** The size of large keys. */
+  @JsonProperty public double largeKeySizeBytes = 1000;
+
+  /** The seed is used for generating a hash function implementing the 128-bit 
murmur3 algorithm. */
+  @JsonIgnore public int seed;
+
+  /**
+   * The hash function is used to generate seeds that are fed into the random 
number generators and
+   * the sleep time distributions.
+   */
+  @JsonIgnore public transient HashFunction hashFunction;
+
+  /**
+   * SyntheticOptions supports several delay distributions including uniform, 
normal, exponential,
+   * and constant delay per record. The delay is either sleep or CPU spinning 
for the duration.
+   *
+   * <ul>
+   *   <li>The uniform delay distribution is specified through
+   *       
"delayDistribution":{"type":"uniform","lower":lower_bound,"upper":upper_bound}, 
where
+   *       lower_bound and upper_bound are non-negative numbers representing 
the delay range in
+   *       milliseconds.
+   *   <li>The normal delay distribution is specified through
+   *       "delayDistribution":{"type":"normal","mean":mean,"stddev":stddev}, 
where mean is a
+   *       non-negative number representing the mean of this normal 
distributed delay in
+   *       milliseconds and stddev is a positive number representing its 
standard deviation.
+   *   <li>The exponential delay distribution is specified through
+   *       "delayDistribution":{"type":"exp","mean":mean}, where mean is a 
positive number
+   *       representing the mean of this exponentially distributed delay in 
milliseconds.
+   *   <li>The zipf distribution is specified through
+   *       
"delayDistribution":{"type":"zipf","param":param,"multiplier":multiplier}, 
where param is
+   *       a number > 1 and multiplier just scales the output of the 
distribution. By default, the
+   *       multiplier is 1. Parameters closer to 1 produce dramatically more 
skewed results. E.g.
+   *       given 100 samples, the min will almost always be 1, while max with 
param 3 will usually
+   *       be below 10; with param 2 max will usually be between several dozen 
and several hundred;
+   *       with param 1.5, thousands to millions.
+   *   <li>The constant sleep time per record is specified through
+   *       "delayDistribution":{"type":"const","const":const} where const is a 
non-negative number
+   *       representing the constant sleep time in milliseconds.
+   * </ul>
+   *
+   * <p>The field delayDistribution is not used in the synthetic unbounded 
source. The synthetic
+   * unbounded source uses RateLimiter to control QPS.
+   */
+  @JsonDeserialize(using = SamplerDeserializer.class)
+  private final Sampler delayDistribution = fromRealDistribution(new 
ConstantRealDistribution(0));
+
+  /**
+   * When 'delayDistribution' is configured, this indicates how the delay 
enforced ("SLEEP", "CPU",
+   * or "MIXED").
+   */
+  @JsonProperty public final DelayType delayType = DelayType.SLEEP;
+
+  /**
+   * CPU utilization when delayType is 'MIXED'. This determines the fraction 
of processing time
+   * spent spinning. The remaining time is spent sleeping. For each 
millisecond of processing time
+   * we choose to spin with probability equal to this fraction.
+   */
+  @JsonProperty public final double cpuUtilizationInMixedDelay;
+
+  SyntheticOptions() {
+    cpuUtilizationInMixedDelay = 0.1;
+    bytesPerRecord = -1;
+  }
+
+  @JsonDeserialize
+  public void setSeed(int seed) {
+    this.seed = seed;
+    this.hashFunction = Hashing.murmur3_128(seed);
+  }
+
+  static class SamplerDeserializer extends JsonDeserializer<Sampler> {
+    @Override
+    public Sampler deserialize(JsonParser jp, DeserializationContext ctxt) 
throws IOException {
+      JsonNode node = jp.getCodec().readTree(jp);
+      String type = node.get("type").asText();
+      switch (type) {
+        case "uniform":
+          {
+            double lowerBound = node.get("lower").asDouble();
+            double upperBound = node.get("upper").asDouble();
+            checkArgument(
+                lowerBound >= 0,
+                "The lower bound of uniform distribution should be a 
non-negative number, "
+                    + "but found %s.",
+                lowerBound);
+            return fromRealDistribution(new 
UniformRealDistribution(lowerBound, upperBound));
+          }
+        case "exp":
+          {
+            double mean = node.get("mean").asDouble();
+            return fromRealDistribution(new ExponentialDistribution(mean));
+          }
+        case "normal":
+          {
+            double mean = node.get("mean").asDouble();
+            double stddev = node.get("stddev").asDouble();
+            checkArgument(
+                mean >= 0,
+                "The mean of normal distribution should be a non-negative 
number, but found %s.",
+                mean);
+            return fromRealDistribution(new NormalDistribution(mean, stddev));
+          }
+        case "const":
+          {
+            double constant = node.get("const").asDouble();
+            checkArgument(
+                constant >= 0,
+                "The value of constant distribution should be a non-negative 
number, but found %s.",
+                constant);
+            return fromRealDistribution(new 
ConstantRealDistribution(constant));
+          }
+        case "zipf":
+          {
+            double param = node.get("param").asDouble();
+            final double multiplier =
+                node.has("multiplier") ? node.get("multiplier").asDouble() : 
1.0;
+            checkArgument(
+                param > 1,
+                "The parameter of the Zipf distribution should be > 1, but 
found %s.",
+                param);
+            checkArgument(
+                multiplier >= 0,
+                "The multiplier of the Zipf distribution should be >= 0, but 
found %s.",
+                multiplier);
+            final ZipfDistribution dist = new ZipfDistribution(100, param);
+            return scaledSampler(fromIntegerDistribution(dist), multiplier);
+          }
+        default:
+          {
+            throw new IllegalArgumentException("Unknown distribution type: " + 
type);
+          }
+      }
+    }
+  }
+
+  public void validate() {
+    checkArgument(
+        keySizeBytes > 0, "keySizeBytes should be a positive number, but found 
%s", keySizeBytes);
+    checkArgument(
+        valueSizeBytes >= 0,
+        "valueSizeBytes should be a non-negative number, but found %s",
+        valueSizeBytes);
+    checkArgument(
+        numHotKeys >= 0, "numHotKeys should be a non-negative number, but 
found %s", numHotKeys);
+    checkArgument(
+        hotKeyFraction >= 0,
+        "hotKeyFraction should be a non-negative number, but found %s",
+        hotKeyFraction);
+    checkArgument(hashFunction != null, "hashFunction hasn't been 
initialized.");
+    if (hotKeyFraction > 0) {
+      int intBytes = Integer.SIZE / 8;
+      checkArgument(
+          keySizeBytes >= intBytes,
+          "Allowing hot keys (hotKeyFraction=%s) requires keySizeBytes "
+              + "to be at least %s, but found %s",
+          hotKeyFraction,
+          intBytes,
+          keySizeBytes);
+    }
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return MAPPER.writeValueAsString(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public long nextDelay(long seed) {
+    return (long) delayDistribution.sample(seed);
+  }
+
+  public KV<byte[], byte[]> genKvPair(long seed) {
+    Random random = new Random(seed);
+
+    byte[] key = new byte[(int) keySizeBytes];
+    // Set the user-key to contain characters other than ordered-code escape 
characters
+    // (specifically '\0' or '\xff'). The user-key is encoded into the 
shuffle-key using
+    // ordered-code, and the shuffle-key is then checked for size limit 
violations. A user-key
+    // consisting of '\0' keySizeBytes would produce a shuffle-key encoding 
double in size,
+    // which would go over the shuffle-key limit (see b/28770924).
+    for (int i = 0; i < keySizeBytes; ++i) {
+      key[i] = 42;
+    }
+    // Determines whether to generate hot key or not.
+    if (random.nextDouble() < hotKeyFraction) {
+      // Generate hot key.
+      // An integer is randomly selected from the range [0, numHotKeys-1] with 
equal probability.
+      int randInt = random.nextInt((int) numHotKeys);
+      ByteBuffer.wrap(key).putInt(hashFunction.hashInt(randInt).asInt());
+    } else {
+      // Note that the random generated key might be a hot key.
+      // But the probability of being a hot key is very small.
+      random.nextBytes(key);
+    }
+
+    byte[] val = new byte[(int) valueSizeBytes];
+    random.nextBytes(val);
+    return KV.of(key, val);
+  }
+}
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
new file mode 100644
index 00000000000..507b962c329
--- /dev/null
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.Duration;
+
+/** Utility functions used in {@link org.apache.beam.sdk.io.synthetic}. */
+class SyntheticUtils {
+  // cpu delay implementation:
+
+  private static final long MASK = (1L << 16) - 1L;
+  private static final long HASH = 0x243F6A8885A308D3L;
+  private static final long INIT_PLAINTEXT = 50000L;
+
+  /** Keep cpu busy for {@code delayMillis} by calculating lots of hashes. */
+  private static void cpuDelay(long delayMillis) {
+    // Note that the delay is enforced in terms of walltime. That implies this 
thread may not
+    // keep CPU busy if it gets preempted by other threads. There is more of 
chance of this
+    // occurring in a streaming pipeline as there could be lots of threads 
running this. The loop
+    // measures cpu time spent for each iteration, so that these effects are 
some what minimized.
+
+    long cpuMicros = delayMillis * 1000;
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    while (timer.elapsed(TimeUnit.MICROSECONDS) < cpuMicros) {
+      // Find a long which hashes to HASH in lowest MASK bits.
+      // Values chosen to roughly take 1ms on typical workstation.
+      timer.start();
+      long p = INIT_PLAINTEXT;
+      while (true) {
+        long t = Hashing.murmur3_128().hashLong(p).asLong();
+        if ((t & MASK) == (HASH & MASK)) {
+          break;
+        }
+        p++;
+      }
+      timer.stop();
+    }
+  }
+
+  /**
+   * Implements a mechanism to delay a thread in various fashions. * {@code 
CPU}: Burn CPU while
+   * waiting. * {@code SLEEP}: Sleep uninterruptibly while waiting. * {@code 
MIXED}: Switch between
+   * burning CPU and sleeping every millisecond to emulate a desired CPU 
utilization specified by
+   * {@code cpuUtilizationInMixedDelay}.
+   *
+   * @return Millis spent sleeping, does not include time spent spinning.
+   */
+  static long delay(
+      Duration delay,
+      double cpuUtilizationInMixedDelay,
+      SyntheticOptions.DelayType delayType,
+      Random rnd) {
+    switch (delayType) {
+      case CPU:
+        cpuDelay(delay.getMillis());
+        return 0;
+      case SLEEP:
+        Uninterruptibles.sleepUninterruptibly(
+            Math.max(0L, delay.getMillis()), TimeUnit.MILLISECONDS);
+        return delay.getMillis();
+      case MIXED:
+        // Mixed mode: for each millisecond of delay randomly choose to spin 
or sleep.
+        // This is enforced at millisecond granularity since that is the 
minimum duration that
+        // Thread.sleep() can sleep. Millisecond is also the unit of 
processing delay.
+        long sleepMillis = 0;
+        for (long i = 0; i < delay.getMillis(); i++) {
+          if (rnd.nextDouble() < cpuUtilizationInMixedDelay) {
+            delay(new Duration(1), 0.0, SyntheticOptions.DelayType.CPU, rnd);
+          } else {
+            sleepMillis += delay(new Duration(1), 0.0, 
SyntheticOptions.DelayType.SLEEP, rnd);
+          }
+        }
+        return sleepMillis;
+      default:
+        throw new IllegalArgumentException("Unknown delay type " + delayType);
+    }
+  }
+}
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
new file mode 100644
index 00000000000..d2f9d71e308
--- /dev/null
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Transforms for performing Synthetic Operations in Apache Beam pipelines. */
+package org.apache.beam.sdk.io.synthetic;
diff --git 
a/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
 
b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
new file mode 100644
index 00000000000..ca648fa1a55
--- /dev/null
+++ 
b/sdks/java/io/synthetic/src/test/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIOTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static 
org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromIntegerDistribution;
+import static 
org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromRealDistribution;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import 
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticBoundedSource;
+import 
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link SyntheticBoundedIO}. */
+@RunWith(JUnit4.class)
+public class SyntheticBoundedIOTest {
+  @Rule public final ExpectedException thrown = ExpectedException.none();
+
+  private SyntheticSourceOptions testSourceOptions = new 
SyntheticSourceOptions();
+
+  @Before
+  public void setUp() {
+    testSourceOptions.splitPointFrequencyRecords = 1;
+    testSourceOptions.numRecords = 10;
+    testSourceOptions.keySizeBytes = 10;
+    testSourceOptions.valueSizeBytes = 20;
+    testSourceOptions.numHotKeys = 3;
+    testSourceOptions.hotKeyFraction = 0.3;
+    testSourceOptions.setSeed(123456);
+    testSourceOptions.bundleSizeDistribution =
+        fromIntegerDistribution(new ZipfDistribution(100, 2.5));
+    testSourceOptions.forceNumInitialBundles = null;
+  }
+
+  private SyntheticSourceOptions fromString(String jsonString) throws 
IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    SyntheticSourceOptions result = mapper.readValue(jsonString, 
SyntheticSourceOptions.class);
+    result.validate();
+    return result;
+  }
+
+  @Test
+  public void testInvalidSourceOptionsJsonFormat() throws Exception {
+    thrown.expect(JsonParseException.class);
+    String syntheticSourceOptions = "input:unknown URI";
+    fromString(syntheticSourceOptions);
+  }
+
+  @Test
+  public void testFromString() throws Exception {
+    String syntheticSourceOptions =
+        
"{\"numRecords\":100,\"splitPointFrequencyRecords\":10,\"keySizeBytes\":10,"
+            + "\"valueSizeBytes\":20,\"numHotKeys\":3,"
+            + "\"hotKeyFraction\":0.3,\"seed\":123456,"
+            + "\"bundleSizeDistribution\":{\"type\":\"const\",\"const\":42},"
+            + 
"\"forceNumInitialBundles\":10,\"progressShape\":\"LINEAR_REGRESSING\""
+            + "}";
+    SyntheticSourceOptions sourceOptions = fromString(syntheticSourceOptions);
+    assertEquals(100, sourceOptions.numRecords);
+    assertEquals(10, sourceOptions.splitPointFrequencyRecords);
+    assertEquals(10, sourceOptions.keySizeBytes);
+    assertEquals(20, sourceOptions.valueSizeBytes);
+    assertEquals(3, sourceOptions.numHotKeys);
+    assertEquals(0.3, sourceOptions.hotKeyFraction, 0);
+    assertEquals(0, sourceOptions.nextDelay(sourceOptions.seed));
+    assertEquals(123456, sourceOptions.seed);
+    assertEquals(42, sourceOptions.bundleSizeDistribution.sample(123), 0.0);
+    assertEquals(10, sourceOptions.forceNumInitialBundles.intValue());
+    assertEquals(SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING, 
sourceOptions.progressShape);
+  }
+
+  @Test
+  public void testSourceOptionsWithNegativeNumRecords() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("numRecords should be a non-negative number, but 
found -100");
+    testSourceOptions.numRecords = -100;
+    testSourceOptions.validate();
+  }
+
+  /** Test the reader and the source produces the same records. */
+  @Test
+  public void testSourceAndReadersWork() throws Exception {
+    testSourceAndReadersWorkP(1);
+    testSourceAndReadersWorkP(-1);
+    testSourceAndReadersWorkP(3);
+  }
+
+  private void testSourceAndReadersWorkP(long splitPointFrequency) throws 
Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    SyntheticBoundedSource source = new 
SyntheticBoundedSource(testSourceOptions);
+    assertEquals(10 * (10 + 20), source.getEstimatedSizeBytes(options));
+    SourceTestUtils.assertUnstartedReaderReadsSameAsItsSource(
+        source.createReader(options), options);
+  }
+
+  @Test
+  public void testSplitAtFraction() throws Exception {
+    testSplitAtFractionP(1);
+    testSplitAtFractionP(3);
+    // Do not test "-1" because then splits would be vacuous
+  }
+
+  private void testSplitAtFractionP(long splitPointFrequency) throws Exception 
{
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    SyntheticBoundedSource source = new 
SyntheticBoundedSource(testSourceOptions);
+    SourceTestUtils.assertSplitAtFractionExhaustive(source, options);
+    // Can't split if already consumed.
+    SourceTestUtils.assertSplitAtFractionFails(source, 5, 0.3, options);
+    SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.3, 
options);
+  }
+
+  @Test
+  public void testSplitIntoBundles() throws Exception {
+    testSplitIntoBundlesP(1);
+    testSplitIntoBundlesP(-1);
+    testSplitIntoBundlesP(5);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.forceNumInitialBundles = 37;
+    assertEquals(
+        37,
+        new 
SyntheticBoundedIO.SyntheticBoundedSource(testSourceOptions).split(42, 
options).size());
+  }
+
+  private void testSplitIntoBundlesP(long splitPointFrequency) throws 
Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.splitPointFrequencyRecords = splitPointFrequency;
+    testSourceOptions.numRecords = 100;
+    SyntheticBoundedSource source = new 
SyntheticBoundedSource(testSourceOptions);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(10, 
options), options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, source.split(40, 
options), options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, 
source.split(100, options), options);
+  }
+
+  @Test
+  public void testIncreasingProgress() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.progressShape = SyntheticBoundedIO.ProgressShape.LINEAR;
+    SyntheticBoundedSource source = new 
SyntheticBoundedSource(testSourceOptions);
+    BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = 
source.createReader(options);
+    // Reader starts at 0.0 progress.
+    assertEquals(0, reader.getFractionConsumed(), 1e-5);
+    // Set the lastFractionConsumed < 0.0 so that we can use strict inequality 
in the below loop.
+    double lastFractionConsumed = -1.0;
+    for (boolean more = reader.start(); more; more = reader.advance()) {
+      assertTrue(reader.getFractionConsumed() > lastFractionConsumed);
+      lastFractionConsumed = reader.getFractionConsumed();
+    }
+    assertEquals(1, reader.getFractionConsumed(), 1e-5);
+  }
+
+  @Test
+  public void testRegressingProgress() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    testSourceOptions.progressShape = 
SyntheticBoundedIO.ProgressShape.LINEAR_REGRESSING;
+    SyntheticBoundedSource source = new 
SyntheticBoundedSource(testSourceOptions);
+    BoundedSource.BoundedReader<KV<byte[], byte[]>> reader = 
source.createReader(options);
+    double lastFractionConsumed = reader.getFractionConsumed();
+    for (boolean more = reader.start(); more; more = reader.advance()) {
+      assertTrue(reader.getFractionConsumed() <= lastFractionConsumed);
+      lastFractionConsumed = reader.getFractionConsumed();
+    }
+  }
+
+  @Test
+  public void testSplitIntoSingleRecordBundles() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    SyntheticSourceOptions sourceOptions = new SyntheticSourceOptions();
+    sourceOptions.numRecords = 10;
+    sourceOptions.setSeed(123456);
+    sourceOptions.bundleSizeDistribution = fromRealDistribution(new 
ConstantRealDistribution(1.0));
+    sourceOptions.forceNumInitialBundles = 10;
+    SyntheticBoundedSource source = new SyntheticBoundedSource(sourceOptions);
+    List<SyntheticBoundedSource> sources = source.split(42L, options);
+    for (SyntheticBoundedSource recordSource : sources) {
+      recordSource.validate();
+      assertEquals(1, recordSource.getEndOffset() - 
recordSource.getStartOffset());
+    }
+    SourceTestUtils.assertSourcesEqualReferenceSource(source, sources, 
options);
+  }
+}
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py 
b/sdks/python/apache_beam/testing/synthetic_pipeline.py
new file mode 100644
index 00000000000..c76b9cd6904
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -0,0 +1,502 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A set of utilities to write pipelines for performance tests.
+
+This module offers a way to create pipelines using synthetic sources and steps.
+Exact shape of the pipeline and the behaviour of sources and steps can be
+controlled through arguments. Please see function 'parse_args()' for more
+details about the arguments.
+
+Shape of the pipeline is primariy controlled through two arguments. Argument
+'steps' can be used to define a list of steps as a JSON string. Argument
+'barrier' describes how these steps are separated from each other. Argument
+'barrier' can be use to build a pipeline as a a series of steps or a tree of
+steps with a fanin or a fanout of size 2.
+
+Other arguments describe what gets generated by synthetic sources that produce
+data for the pipeline.
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import argparse
+import json
+import logging
+import math
+import time
+
+import apache_beam as beam
+from apache_beam.io import WriteToText
+from apache_beam.io import iobase
+from apache_beam.io import range_trackers
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+try:
+  import numpy as np
+except ImportError:
+  np = None
+
+
+def parse_byte_size(s):
+  suffixes = 'BKMGTP'
+  if s[-1] in suffixes:
+    return int(float(s[:-1]) * 1024**suffixes.index(s[-1]))
+
+  return int(s)
+
+
+def div_round_up(a, b):
+  """Return ceil(a/b)."""
+  return int(math.ceil(float(a) / b))
+
+
+def rotate_key(element):
+  """Returns a new key-value pair of the same size but with a different key."""
+  (key, value) = element
+  return key[-1] + key[:-1], value
+
+
+class SyntheticStep(beam.DoFn):
+  """A DoFn of which behavior can be controlled through prespecified 
parameters.
+  """
+
+  def __init__(self, per_element_delay_sec=0, per_bundle_delay_sec=0,
+               output_records_per_input_record=1, output_filter_ratio=0):
+    if per_element_delay_sec and per_element_delay_sec < 1e-3:
+      raise ValueError('Per element sleep time must be at least 1e-3. '
+                       'Received: %r', per_element_delay_sec)
+    self._per_element_delay_sec = per_element_delay_sec
+    self._per_bundle_delay_sec = per_bundle_delay_sec
+    self._output_records_per_input_record = output_records_per_input_record
+    self._output_filter_ratio = output_filter_ratio
+
+  def start_bundle(self):
+    self._start_time = time.time()
+
+  def finish_bundle(self):
+    # The target is for the enclosing stage to take as close to as possible
+    # the given number of seconds, so we only sleep enough to make up for
+    # overheads not incurred elsewhere.
+    to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
+
+    # Ignoring sub-millisecond sleep times.
+    if to_sleep >= 1e-3:
+      time.sleep(to_sleep)
+
+  def process(self, element):
+    if self._per_element_delay_sec >= 1e-3:
+      time.sleep(self._per_element_delay_sec)
+    filter_element = False
+    if self._output_filter_ratio > 0:
+      if np.random.random() < self._output_filter_ratio:
+        filter_element = True
+
+    if not filter_element:
+      for _ in range(self._output_records_per_input_record):
+        yield element
+
+
+class SyntheticSource(iobase.BoundedSource):
+  """A custom source of a specified size.
+  """
+
+  def __init__(self, input_spec):
+    """Initiates a synthetic source.
+
+    Args:
+      input_spec: Input specification of the source. See corresponding option 
in
+                  function 'parse_args()' below for more details.
+    Raises:
+      ValueError: if input parameters are invalid.
+    """
+
+    def maybe_parse_byte_size(s):
+      return parse_byte_size(s) if isinstance(s, str) else int(s)
+
+    self._num_records = input_spec['numRecords']
+    self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
+    self._value_size = maybe_parse_byte_size(
+        input_spec.get('valueSizeBytes', 1))
+    self._total_size = self.element_size * self._num_records
+    self._initial_splitting = (
+        input_spec['bundleSizeDistribution']['type']
+        if 'bundleSizeDistribution' in input_spec else 'const')
+    if self._initial_splitting != 'const' and self._initial_splitting != 
'zipf':
+      raise ValueError(
+          'Only const and zipf distributions are supported for determining '
+          'sizes of bundles produced by initial splitting. Received: %s',
+          self._initial_splitting)
+    self._initial_splitting_num_bundles = (
+        input_spec['forceNumInitialBundles']
+        if 'forceNumInitialBundles' in input_spec else 0)
+    if self._initial_splitting == 'zipf':
+      self._initial_splitting_distribution_parameter = (
+          input_spec['bundleSizeDistribution']['param'])
+      if self._initial_splitting_distribution_parameter < 1:
+        raise ValueError(
+            'Parameter for a Zipf distribution must be larger than 1. '
+            'Received %r.', self._initial_splitting_distribution_parameter)
+    else:
+      self._initial_splitting_distribution_parameter = 0
+    self._dynamic_splitting = (
+        'none' if (
+            'splitPointFrequencyRecords' in input_spec
+            and input_spec['splitPointFrequencyRecords'] == 0)
+        else 'perfect')
+    if 'delayDistribution' in input_spec:
+      if input_spec['delayDistribution']['type'] != 'const':
+        raise ValueError('SyntheticSource currently only supports delay '
+                         'distributions of type \'const\'. Received %s.',
+                         input_spec['delayDistribution']['type'])
+      self._sleep_per_input_record_sec = (
+          float(input_spec['delayDistribution']['const']) / 1000)
+      if (self._sleep_per_input_record_sec and
+          self._sleep_per_input_record_sec < 1e-3):
+        raise ValueError('Sleep time per input record must be at least 1e-3.'
+                         ' Received: %r', self._sleep_per_input_record_sec)
+    else:
+      self._sleep_per_input_record_sec = 0
+
+  @property
+  def element_size(self):
+    return self._key_size + self._value_size
+
+  def estimate_size(self):
+    return self._total_size
+
+  def split(self, desired_bundle_size, start_position=0, stop_position=None):
+    # Performs initial splitting of SyntheticSource.
+    #
+    # Exact sizes and distribution of initial splits generated here depends on
+    # the input specification of the SyntheticSource.
+
+    if stop_position is None:
+      stop_position = self._num_records
+    if self._initial_splitting == 'zipf':
+      desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
+          float(self.estimate_size()) / desired_bundle_size)
+      samples = np.random.zipf(self._initial_splitting_distribution_parameter,
+                               desired_num_bundles)
+      total = sum(samples)
+      relative_bundle_sizes = [(float(sample) / total) for sample in samples]
+      bundle_ranges = []
+      start = start_position
+      index = 0
+      while start < stop_position:
+        if index == desired_num_bundles - 1:
+          bundle_ranges.append((start, stop_position))
+          break
+        stop = start + int(self._num_records * relative_bundle_sizes[index])
+        bundle_ranges.append((start, stop))
+        start = stop
+        index += 1
+    else:
+      if self._initial_splitting_num_bundles:
+        bundle_size_in_elements = max(1, self._num_records /
+                                      self._initial_splitting_num_bundles)
+      else:
+        bundle_size_in_elements = (max(
+            div_round_up(desired_bundle_size, self.element_size),
+            math.floor(math.sqrt(self._num_records))))
+      bundle_ranges = []
+      for start in range(start_position, stop_position,
+                         bundle_size_in_elements):
+        stop = min(start + bundle_size_in_elements, stop_position)
+        bundle_ranges.append((start, stop))
+
+    for start, stop in bundle_ranges:
+      yield iobase.SourceBundle(stop - start, self, start, stop)
+
+  def get_range_tracker(self, start_position, stop_position):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._num_records
+    tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
+    if self._dynamic_splitting == 'none':
+      tracker = range_trackers.UnsplittableRangeTracker(tracker)
+    return tracker
+
+  def read(self, range_tracker):
+    index = range_tracker.start_position()
+    while range_tracker.try_claim(index):
+      r = np.random.RandomState(index)
+
+      time.sleep(self._sleep_per_input_record_sec)
+      yield r.bytes(self._key_size), r.bytes(self._value_size)
+      index += 1
+
+  def default_output_coder(self):
+    return beam.coders.TupleCoder(
+        [beam.coders.BytesCoder(), beam.coders.BytesCoder()])
+
+
+class ShuffleBarrier(beam.PTransform):
+
+  def expand(self, pc):
+    return (pc
+            | beam.Map(rotate_key)
+            | beam.GroupByKey()
+            | 'Ungroup' >> beam.FlatMap(
+                lambda elm: [(elm[0], v) for v in elm[1]]))
+
+
+class SideInputBarrier(beam.PTransform):
+
+  def expand(self, pc):
+    return (pc
+            | beam.Map(rotate_key)
+            | beam.Map(
+                lambda elem, ignored: elem,
+                beam.pvalue.AsIter(pc | beam.FlatMap(lambda elem: None))))
+
+
+def merge_using_gbk(name, pc1, pc2):
+  """Merges two given PCollections using a CoGroupByKey."""
+
+  pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
+  pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))
+
+  grouped = (
+      {'pc1': pc1_with_key, 'pc2': pc2_with_key} |
+      (name + 'Group') >> beam.CoGroupByKey())
+  return (grouped |
+          (name + 'DeDup') >> beam.Map(lambda elm: elm[0]))  # Ignoring values
+
+
+def merge_using_side_input(name, pc1, pc2):
+  """Merges two given PCollections using side inputs."""
+
+  def join_fn(val, _):  # Ignoring side input
+    return val
+
+  return pc1 | name >> beam.core.Map(join_fn, beam.pvalue.AsIter(pc2))
+
+
+def expand_using_gbk(name, pc):
+  """Expands a given PCollection into two copies using GroupByKey."""
+
+  ret = []
+  ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
+  ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
+  return ret
+
+
+def expand_using_second_output(name, pc):
+  """Expands a given PCollection into two copies using side outputs."""
+
+  class ExpandFn(beam.DoFn):
+
+    def process(self, element):
+      yield beam.pvalue.TaggedOutput('second_out', element)
+      yield element
+
+  pc1, pc2 = (pc | name >> beam.ParDo(
+      ExpandFn()).with_outputs('second_out', main='main_out'))
+  return [pc1, pc2]
+
+
+def _parse_steps(json_str):
+  """Converts the JSON step description into Python objects.
+
+  See property 'steps' for more details about the JSON step description.
+
+  Args:
+    json_str: a JSON string that describes the steps.
+
+  Returns:
+    Information about steps as a list of dictionaries. Each dictionary may have
+    following properties.
+    (1) per_element_delay - amount of delay for each element in seconds.
+    (2) per_bundle_delay - minimum amount of delay for a given step in seconds.
+    (3) output_records_per_input_record - number of output elements generated
+        for each input element to a step.
+    (4) output_filter_ratio - the probability at which a step may filter out a
+        given element by not producing any output for that element.
+  """
+  all_steps = []
+  json_data = json.loads(json_str)
+  for val in json_data:
+    steps = {}
+    steps['per_element_delay'] = (
+        (float(val['per_element_delay_msec']) / 1000)
+        if 'per_element_delay_msec' in val else 0)
+    steps['per_bundle_delay'] = (
+        float(val['per_bundle_delay_sec'])
+        if 'per_bundle_delay_sec' in val else 0)
+    steps['output_records_per_input_record'] = (
+        int(val['output_records_per_input_record'])
+        if 'output_records_per_input_record' in val else 1)
+    steps['output_filter_ratio'] = (
+        float(val['output_filter_ratio'])
+        if 'output_filter_ratio' in val else 0)
+    all_steps.append(steps)
+
+  return all_steps
+
+
+def parse_args(args):
+  """Parses a given set of arguments.
+
+  Args:
+    args: set of arguments to be passed.
+
+  Returns:
+    a tuple where first item gives the set of arguments defined and parsed
+    within this method and second item gives the set of unknown arguments.
+  """
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--steps',
+      dest='steps',
+      type=_parse_steps,
+      help='A JSON string that gives a list where each entry of the list is '
+           'configuration information for a step. Configuration for each step '
+           'consists of '
+           '(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
+           '(2) A float "per_element_delay_msec" (in milli seconds). '
+           '    Defaults to 0.'
+           '(3) An integer "output_records_per_input_record". Defaults to 1.'
+           '(4) A float "output_filter_ratio" in the range [0, 1] . '
+           '    Defaults to 0.')
+
+  parser.add_argument(
+      '--input',
+      dest='input',
+      type=json.loads,
+      help='A JSON string that describes the properties of the SyntheticSource 
'
+           'used by the pipeline. Configuration is similar to Java '
+           'SyntheticBoundedInput.'
+           'Currently supports following properties. '
+           '(1) An integer "numRecords". '
+           '(2) An integer "keySize". '
+           '(3) An integer "valueSize". '
+           '(4) A tuple "bundleSizeDistribution" with following values. '
+           '    A string "type". Allowed values are "const" and "zipf". '
+           '    An float "param". Only used if "type"=="zipf". Must be '
+           '    larger than 1. '
+           '(5) An integer "forceNumInitialBundles". '
+           '(6) An integer "splitPointFrequencyRecords". '
+           '(7) A tuple "delayDistribution" with following values. '
+           '    A string "type". Only allowed value is "const". '
+           '    An integer "const". ')
+
+  parser.add_argument('--barrier',
+                      dest='barrier',
+                      default='shuffle',
+                      choices=['shuffle', 'side-input', 'expand-gbk',
+                               'expand-second-output', 'merge-gbk',
+                               'merge-side-input'],
+                      help='Whether to use shuffle as the barrier '
+                           '(as opposed to side inputs).')
+  parser.add_argument('--output',
+                      dest='output',
+                      default='',
+                      help='Destination to write output.')
+
+  return parser.parse_known_args(args)
+
+
+def run(argv=None):
+  """Runs the workflow."""
+  known_args, pipeline_args = parse_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  input_info = known_args.input
+
+  with TestPipeline(options=pipeline_options) as p:
+    source = SyntheticSource(input_info)
+
+    # pylint: disable=expression-not-assigned
+    barrier = known_args.barrier
+
+    pc_list = []
+    num_roots = 2 ** (len(known_args.steps) - 1) if (
+        barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
+    for read_no in range(num_roots):
+      pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))
+
+    for step_no, steps in enumerate(known_args.steps):
+      if step_no != 0:
+        new_pc_list = []
+        for pc_no, pc in enumerate(pc_list):
+          if barrier == 'shuffle':
+            new_pc_list.append(
+                (pc |
+                 ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
+          elif barrier == 'side-input':
+            new_pc_list.append(
+                (pc |
+                 ('side-input %d.%d' % (step_no, pc_no)) >> 
SideInputBarrier()))
+          elif barrier == 'expand-gbk':
+            new_pc_list.extend(
+                expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
+          elif barrier == 'expand-second-output':
+            new_pc_list.extend(
+                expand_using_second_output(
+                    ('expand-second-output %d.%d' % (step_no, pc_no)), pc))
+          elif barrier == 'merge-gbk':
+            if pc_no % 2 == 0:
+              new_pc_list.append(
+                  merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
+                                  pc, pc_list[pc_no + 1]))
+            else:
+              continue
+          elif barrier == 'merge-side-input':
+            if pc_no % 2 == 0:
+              new_pc_list.append(
+                  merge_using_side_input(
+                      ('merge-side-input %d.%d' % (step_no, pc_no)),
+                      pc, pc_list[pc_no + 1]))
+            else:
+              continue
+
+        pc_list = new_pc_list
+
+      new_pc_list = []
+      for pc_no, pc in enumerate(pc_list):
+        new_pc = pc | 'SyntheticStep %d.%d' % (step_no, pc_no) >> beam.ParDo(
+            SyntheticStep(
+                per_element_delay_sec=steps['per_element_delay'],
+                per_bundle_delay_sec=steps['per_bundle_delay'],
+                output_records_per_input_record=
+                steps['output_records_per_input_record'],
+                output_filter_ratio=
+                steps['output_filter_ratio']))
+        new_pc_list.append(new_pc)
+      pc_list = new_pc_list
+
+    if known_args.output:
+      # If an output location is provided we format and write output.
+      if len(pc_list) == 1:
+        (pc_list[0] |
+         'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1])) |
+         'WriteOutput' >> WriteToText(known_args.output))
+
+  logging.info('Pipeline run completed.')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline_test.py 
b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
new file mode 100644
index 00000000000..fe5e94a78cd
--- /dev/null
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline_test.py
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for apache_beam.testing.synthetic_pipeline."""
+
+from __future__ import absolute_import
+
+import glob
+import json
+import logging
+import tempfile
+import time
+import unittest
+
+import apache_beam as beam
+from apache_beam.io import source_test_utils
+from apache_beam.testing import synthetic_pipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+  import numpy as np
+except ImportError:
+  np = None
+
+
+def input_spec(num_records, key_size, value_size,
+               bundle_size_distribution_type='const',
+               bundle_size_distribution_param=0,
+               force_initial_num_bundles=0):
+  return {
+      'numRecords': num_records,
+      'keySizeBytes': key_size,
+      'valueSizeBytes': value_size,
+      'bundleSizeDistribution': {'type': bundle_size_distribution_type,
+                                 'param': bundle_size_distribution_param},
+      'forceNumInitialBundles': force_initial_num_bundles,
+  }
+
+
[email protected](np is None, 'Synthetic source dependencies are not installed')
+class SyntheticPipelineTest(unittest.TestCase):
+
+  # pylint: disable=expression-not-assigned
+
+  def testSyntheticStep(self):
+    start = time.time()
+    with beam.Pipeline() as p:
+      pcoll = p | beam.Create(list(range(10))) | beam.ParDo(
+          synthetic_pipeline.SyntheticStep(0, 0.5, 10))
+      assert_that(
+          pcoll | beam.combiners.Count.Globally(), equal_to([100]))
+
+    elapsed = time.time() - start
+    # TODO(chamikaramj): Fix the flaky time based bounds.
+    self.assertTrue(0.5 <= elapsed <= 3, elapsed)
+
+  def testSyntheticSource(self):
+    def assert_size(element, expected_size):
+      assert len(element) == expected_size
+    with beam.Pipeline() as p:
+      pcoll = (
+          p | beam.io.Read(
+              synthetic_pipeline.SyntheticSource(input_spec(300, 5, 15))))
+      (pcoll
+       | beam.Map(lambda elm: elm[0]) | 'key' >> beam.Map(assert_size, 5))
+      (pcoll
+       | beam.Map(lambda elm: elm[1]) | 'value' >> beam.Map(assert_size, 15))
+      assert_that(pcoll | beam.combiners.Count.Globally(),
+                  equal_to([300]))
+
+  def testSyntheticSourceSplitEven(self):
+    source = synthetic_pipeline.SyntheticSource(
+        input_spec(1000, 1, 1, 'const', 0))
+    splits = source.split(100)
+    sources_info = [(split.source, split.start_position, split.stop_position)
+                    for split in splits]
+    self.assertEquals(20, len(sources_info))
+    source_test_utils.assert_sources_equal_reference_source(
+        (source, None, None), sources_info)
+
+  def testSyntheticSourceSplitUneven(self):
+    source = synthetic_pipeline.SyntheticSource(
+        input_spec(1000, 1, 1, 'zipf', 3, 10))
+    splits = source.split(100)
+    sources_info = [(split.source, split.start_position, split.stop_position)
+                    for split in splits]
+    self.assertEquals(10, len(sources_info))
+    source_test_utils.assert_sources_equal_reference_source(
+        (source, None, None), sources_info)
+
+  def testSplitAtFraction(self):
+    source = synthetic_pipeline.SyntheticSource(input_spec(10, 1, 1))
+    source_test_utils.assert_split_at_fraction_exhaustive(source)
+    source_test_utils.assert_split_at_fraction_fails(source, 5, 0.3)
+    source_test_utils.assert_split_at_fraction_succeeds_and_consistent(
+        source, 1, 0.3)
+
+  def run_pipeline(self, barrier, writes_output=True):
+    steps = [{'per_element_delay': 1}, {'per_element_delay': 1}]
+    args = ['--barrier=%s' % barrier, '--runner=DirectRunner',
+            '--steps=%s' % json.dumps(steps),
+            '--input=%s' % json.dumps(input_spec(10, 1, 1))]
+    if writes_output:
+      output_location = tempfile.NamedTemporaryFile().name
+      args.append('--output=%s' % output_location)
+
+    synthetic_pipeline.run(args)
+
+    # Verify output
+    if writes_output:
+      read_output = []
+      for file_name in glob.glob(output_location + '*'):
+        with open(file_name, 'r') as f:
+          read_output.extend(f.read().splitlines())
+
+      self.assertEqual(10, len(read_output))
+
+  def testPipelineShuffle(self):
+    self.run_pipeline('shuffle')
+
+  def testPipelineSideInput(self):
+    self.run_pipeline('side-input')
+
+  def testPipelineExpandGBK(self):
+    self.run_pipeline('expand-gbk', False)
+
+  def testPipelineExpandSideOutput(self):
+    self.run_pipeline('expand-second-output', False)
+
+  def testPipelineMergeGBK(self):
+    self.run_pipeline('merge-gbk')
+
+  def testPipelineMergeSideInput(self):
+    self.run_pipeline('merge-side-input')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh 
b/sdks/python/scripts/generate_pydoc.sh
index 0e5bfbbbb58..2f6f0f14cfb 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -178,6 +178,7 @@ nitpicky = True
 nitpick_ignore = []
 nitpick_ignore += [('py:class', iden) for iden in ignore_identifiers]
 nitpick_ignore += [('py:obj', iden) for iden in ignore_identifiers]
+nitpick_ignore += [('py:exc', 'ValueError')]
 EOF
 
 #=== index.rst ===#
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index a174ced0d7b..55253d839f1 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -120,6 +120,10 @@ def get_version():
     'pyhamcrest>=1.9,<2.0',
     ]
 
+REQUIRED_PERF_TEST_PACKAGES = [
+    'numpy>=1.14.3',
+]
+
 GCP_REQUIREMENTS = [
     # oauth2client >=4 only works with google-apitools>=0.5.18.
     'google-apitools>=0.5.18,<=0.5.20',
@@ -184,7 +188,8 @@ def run(self):
     extras_require={
         'docs': ['Sphinx>=1.5.2,<2.0'],
         'test': REQUIRED_TEST_PACKAGES,
-        'gcp': GCP_REQUIREMENTS
+        'gcp': GCP_REQUIREMENTS,
+        'perftest': REQUIRED_PERF_TEST_PACKAGES,
     },
     zip_safe=False,
     # PyPI package information.
diff --git a/settings.gradle b/settings.gradle
index 07ae1adead7..b3ac28e1cca 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -152,6 +152,8 @@ include "beam-sdks-java-io-tika"
 project(":beam-sdks-java-io-tika").dir = file("sdks/java/io/tika")
 include "beam-sdks-java-io-xml"
 project(":beam-sdks-java-io-xml").dir = file("sdks/java/io/xml")
+include "beam-sdks-java-io-synthetic"
+project(":beam-sdks-java-io-synthetic").dir = file("sdks/java/io/synthetic")
 include "beam-sdks-java-javadoc"
 project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
 include "beam-sdks-java-maven-archetypes-examples"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 123414)
    Time Spent: 8h 40m  (was: 8.5h)

> Performance tests need a way to generate Synthetic data
> -------------------------------------------------------
>
>                 Key: BEAM-4432
>                 URL: https://issues.apache.org/jira/browse/BEAM-4432
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Minor
>          Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> GenerateSequence fal.lls short in this regard, as we may want to generate 
> data in custom distributions, or with specific repeatability requirements / 
> and hardcoded delays for autoscaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to