This is an automated email from the ASF dual-hosted git repository.

lgajowy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fa1142c  Merge pull request #6985: [BEAM-5983] Create Combine load test
fa1142c is described below

commit fa1142c1e2edb1535f45f7cab808743bd47b9f74
Author: Ɓukasz Gajowy <lukasz.gaj...@gmail.com>
AuthorDate: Fri Nov 16 10:40:47 2018 +0100

    Merge pull request #6985: [BEAM-5983] Create Combine load test
---
 .../apache/beam/sdk/loadtests/CombineLoadTest.java | 156 +++++++++++++++++++++
 1 file changed, 156 insertions(+)

diff --git 
a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
new file mode 100644
index 0000000..a85f23b
--- /dev/null
+++ 
b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+import static java.lang.String.format;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Optional;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticStep;
+import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Load test for {@link ParDo} operation.
+ *
+ * <p>The purpose of this test is to measure {@link Combine}'s behaviour in 
stressful conditions. It
+ * uses {@link SyntheticBoundedIO} and {@link SyntheticStep} which both can be 
parametrized to
+ * generate keys and values of various size, impose delay (sleep or cpu 
burnout) in various moments
+ * during the pipeline execution and provide some other performance challenges.
+ *
+ * @see SyntheticStep
+ * @see SyntheticBoundedIO
+ *     <p>You can choose between multiple combine modes to test per key 
combine operations ({@link
+ *     CombinerType}).
+ *     <p>To run it manually, use the following command:
+ *     <pre>
+ *    ./gradlew :beam-sdks-java-load-tests:run -PloadTest.args='
+ *      --fanout=1
+ *      --perKeyCombinerType=TOP_LARGEST
+ *      --topCount=10
+ *      --sourceOptions={"numRecords":1000,...}
+ *      --stepOptions={"outputRecordsPerInputRecord":2...}'
+ *      -PloadTest.mainClass="org.apache.beam.sdk.loadtests.CombineLoadTest"
+ * </pre>
+ */
+public class CombineLoadTest extends LoadTest<CombineLoadTest.Options> {
+
+  private static final String METRICS_NAMESPACE = "combine";
+
+  private enum CombinerType {
+    TOP_LARGEST,
+    MEAN,
+    SUM,
+    COUNT
+  }
+
+  /** Pipeline options specific for this test. */
+  interface Options extends LoadTestOptions {
+
+    @Description("Number consequent of ParDo operations (SyntheticSteps) to be 
performed.")
+    @Default.Integer(1)
+    Integer getNumberOfCounterOperations();
+
+    void setNumberOfCounterOperations(Integer count);
+
+    @Description("The number of Combine operations to perform in parallel.")
+    @Default.Integer(1)
+    Integer getFanout();
+
+    void setFanout(Integer fanout);
+
+    @Description("Per key combiner type.")
+    @Default.Enum("MEAN")
+    CombinerType getPerKeyCombinerType();
+
+    void setPerKeyCombinerType(CombinerType combinerType);
+
+    @Description("Number of top results to combine (if applicable).")
+    Integer getTopCount();
+
+    void setTopCount(Integer topCount);
+  }
+
+  private CombineLoadTest(String[] args) throws IOException {
+    super(args, Options.class, METRICS_NAMESPACE);
+  }
+
+  @Override
+  protected void loadTest() throws IOException {
+    PTransform combiner = 
createPerKeyCombiner(options.getPerKeyCombinerType());
+
+    Optional<SyntheticStep> syntheticStep = 
createStep(options.getStepOptions());
+
+    PCollection<KV<byte[], byte[]>> input =
+        pipeline
+            .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions))
+            .apply("Collect metrics", ParDo.of(new 
MetricsMonitor(METRICS_NAMESPACE)));
+
+    for (int i = 0; i < options.getFanout(); i++) {
+      applyStepIfPresent(input, format("Step: %d", i), syntheticStep)
+          .apply(format("Convert to BigInteger: %d", i), MapElements.via(new 
ByteValueToLong()))
+          .apply(format("Combine: %d", i), combiner);
+    }
+  }
+
+  private PTransform createPerKeyCombiner(CombinerType combinerType) {
+    switch (combinerType) {
+      case MEAN:
+        return Mean.perKey();
+      case TOP_LARGEST:
+        Preconditions.checkArgument(
+            options.getTopCount() != null,
+            "You should set \"--topCount\" option to use TOP combiners.");
+        return Top.largestPerKey(options.getTopCount());
+      case SUM:
+        return Sum.longsPerKey();
+      case COUNT:
+        return Count.perKey();
+      default:
+        throw new IllegalArgumentException("No such combiner!");
+    }
+  }
+
+  private static class ByteValueToLong
+      extends SimpleFunction<KV<byte[], byte[]>, KV<byte[], Long>> {
+
+    @Override
+    public KV<byte[], Long> apply(KV<byte[], byte[]> input) {
+      return KV.of(input.getKey(), new 
BigInteger(input.getValue()).longValue());
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    new CombineLoadTest(args).run();
+  }
+}

Reply via email to