This is an automated email from the ASF dual-hosted git repository. lgajowy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fa1142c Merge pull request #6985: [BEAM-5983] Create Combine load test fa1142c is described below commit fa1142c1e2edb1535f45f7cab808743bd47b9f74 Author: Ćukasz Gajowy <lukasz.gaj...@gmail.com> AuthorDate: Fri Nov 16 10:40:47 2018 +0100 Merge pull request #6985: [BEAM-5983] Create Combine load test --- .../apache/beam/sdk/loadtests/CombineLoadTest.java | 156 +++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java new file mode 100644 index 0000000..a85f23b --- /dev/null +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/CombineLoadTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.loadtests; + +import static java.lang.String.format; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.math.BigInteger; +import java.util.Optional; +import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO; +import org.apache.beam.sdk.io.synthetic.SyntheticStep; +import org.apache.beam.sdk.loadtests.metrics.MetricsMonitor; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Top; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Load test for {@link ParDo} operation. + * + * <p>The purpose of this test is to measure {@link Combine}'s behaviour in stressful conditions. It + * uses {@link SyntheticBoundedIO} and {@link SyntheticStep} which both can be parametrized to + * generate keys and values of various size, impose delay (sleep or cpu burnout) in various moments + * during the pipeline execution and provide some other performance challenges. + * + * @see SyntheticStep + * @see SyntheticBoundedIO + * <p>You can choose between multiple combine modes to test per key combine operations ({@link + * CombinerType}). + * <p>To run it manually, use the following command: + * <pre> + * ./gradlew :beam-sdks-java-load-tests:run -PloadTest.args=' + * --fanout=1 + * --perKeyCombinerType=TOP_LARGEST + * --topCount=10 + * --sourceOptions={"numRecords":1000,...} + * --stepOptions={"outputRecordsPerInputRecord":2...}' + * -PloadTest.mainClass="org.apache.beam.sdk.loadtests.CombineLoadTest" + * </pre> + */ +public class CombineLoadTest extends LoadTest<CombineLoadTest.Options> { + + private static final String METRICS_NAMESPACE = "combine"; + + private enum CombinerType { + TOP_LARGEST, + MEAN, + SUM, + COUNT + } + + /** Pipeline options specific for this test. */ + interface Options extends LoadTestOptions { + + @Description("Number consequent of ParDo operations (SyntheticSteps) to be performed.") + @Default.Integer(1) + Integer getNumberOfCounterOperations(); + + void setNumberOfCounterOperations(Integer count); + + @Description("The number of Combine operations to perform in parallel.") + @Default.Integer(1) + Integer getFanout(); + + void setFanout(Integer fanout); + + @Description("Per key combiner type.") + @Default.Enum("MEAN") + CombinerType getPerKeyCombinerType(); + + void setPerKeyCombinerType(CombinerType combinerType); + + @Description("Number of top results to combine (if applicable).") + Integer getTopCount(); + + void setTopCount(Integer topCount); + } + + private CombineLoadTest(String[] args) throws IOException { + super(args, Options.class, METRICS_NAMESPACE); + } + + @Override + protected void loadTest() throws IOException { + PTransform combiner = createPerKeyCombiner(options.getPerKeyCombinerType()); + + Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions()); + + PCollection<KV<byte[], byte[]>> input = + pipeline + .apply("Read input", SyntheticBoundedIO.readFrom(sourceOptions)) + .apply("Collect metrics", ParDo.of(new MetricsMonitor(METRICS_NAMESPACE))); + + for (int i = 0; i < options.getFanout(); i++) { + applyStepIfPresent(input, format("Step: %d", i), syntheticStep) + .apply(format("Convert to BigInteger: %d", i), MapElements.via(new ByteValueToLong())) + .apply(format("Combine: %d", i), combiner); + } + } + + private PTransform createPerKeyCombiner(CombinerType combinerType) { + switch (combinerType) { + case MEAN: + return Mean.perKey(); + case TOP_LARGEST: + Preconditions.checkArgument( + options.getTopCount() != null, + "You should set \"--topCount\" option to use TOP combiners."); + return Top.largestPerKey(options.getTopCount()); + case SUM: + return Sum.longsPerKey(); + case COUNT: + return Count.perKey(); + default: + throw new IllegalArgumentException("No such combiner!"); + } + } + + private static class ByteValueToLong + extends SimpleFunction<KV<byte[], byte[]>, KV<byte[], Long>> { + + @Override + public KV<byte[], Long> apply(KV<byte[], byte[]> input) { + return KV.of(input.getKey(), new BigInteger(input.getValue()).longValue()); + } + } + + public static void main(String[] args) throws IOException { + new CombineLoadTest(args).run(); + } +}