[
https://issues.apache.org/jira/browse/BEAM-5355?focusedWorklogId=151640&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151640
]
ASF GitHub Bot logged work on BEAM-5355:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Oct/18 15:27
Start Date: 05/Oct/18 15:27
Worklog Time Spent: 10m
Work Description: lgajowy closed pull request #6571: [BEAM-5355] Add
GroupByKeyLoadTest
URL: https://github.com/apache/beam/pull/6571
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/io/synthetic/build.gradle
b/sdks/java/io/synthetic/build.gradle
index 22bc6809cc9..900f61d1722 100644
--- a/sdks/java/io/synthetic/build.gradle
+++ b/sdks/java/io/synthetic/build.gradle
@@ -24,10 +24,12 @@ ext.summary = "Generators of Synthetic IO for Testing."
dependencies {
compile library.java.joda_time
- compile library.java.commons_math3
+ shadow library.java.commons_math3
shadow library.java.jackson_core
shadow library.java.jackson_annotations
shadow library.java.jackson_databind
+ shadow library.java.guava
+
testCompile library.java.guava
testCompile library.java.junit
testCompile library.java.hamcrest_core
diff --git a/sdks/java/load-tests/OWNERS b/sdks/java/load-tests/OWNERS
new file mode 100644
index 00000000000..2a140107225
--- /dev/null
+++ b/sdks/java/load-tests/OWNERS
@@ -0,0 +1,5 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
+
+reviewers:
+ - lgajowy
+ - kkucharc
diff --git a/sdks/java/load-tests/build.gradle
b/sdks/java/load-tests/build.gradle
new file mode 100644
index 00000000000..c3a1f632b42
--- /dev/null
+++ b/sdks/java/load-tests/build.gradle
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+description = "Apache Beam :: SDKs :: Java :: Load Tests"
+
+
+def mainClassProperty = "loadTest.mainClass"
+def mainClass = project.findProperty(mainClassProperty)
+
+// When running via Gradle, this property can be used to pass commandline
arguments
+// to the nexmark launch
+def loadTestArgsProperty = "loadTest.args"
+def loadTestArgs = project.hasProperty(loadTestArgsProperty) ?
+ project.getProperty(loadTestArgsProperty).split() : []
+
+// When running via Gradle, this property sets the runner dependency
+def runnerProperty = "runner"
+def runnerDependency = (project.hasProperty(runnerProperty)
+ ? project.getProperty(runnerProperty)
+ : ":beam-runners-direct-java")
+
+def shouldProvideSpark = ":beam-runners-spark".equals(runnerDependency)
+
+configurations {
+ // A configuration for running the Load testlauncher directly from Gradle,
which
+ // uses Gradle to put the appropriate dependencies on the Classpath rather
than
+ // bundling them into a fat jar
+ gradleRun
+}
+
+dependencies {
+ shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+ shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
+ shadow project(path: ":beam-sdks-java-io-synthetic", configuration: "shadow")
+
+ gradleRun project(path: project.path, configuration: "shadow")
+ gradleRun project(path: runnerDependency, configuration: "shadow")
+
+ // The Spark runner requires the user to provide a Spark dependency. For
self-contained
+ // runs with the Spark runner, we can provide such a dependency. This is
deliberately phrased
+ // to not hardcode any runner other than :beam-runners-direct-java
+ if (shouldProvideSpark) {
+ gradleRun library.java.spark_streaming
+ gradleRun library.java.spark_core, {
+ exclude group:"org.slf4j", module:"jul-to-slf4j"
+ }
+ }
+}
+
+if (shouldProvideSpark) {
+ configurations.gradleRun {
+ // Using Spark runner causes a StackOverflowError if slf4j-jdk14 is on the
classpath
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+ }
+}
+
+task run(type: JavaExec) {
+ main = mainClass
+ classpath = configurations.gradleRun
+ args loadTestArgs
+}
+
diff --git
a/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
b/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
new file mode 100644
index 00000000000..775220c90f4
--- /dev/null
+++
b/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/GroupByKeyLoadTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+import static java.lang.String.format;
+import static
org.apache.beam.sdk.loadtests.GroupByKeyLoadTest.Options.fromJsonString;
+import static
org.apache.beam.sdk.loadtests.GroupByKeyLoadTest.Options.readFromArgs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
+import
org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticStep;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Load test for {@link GroupByKey} operation.
+ *
+ * <p>The purpose of this test is to measure {@link GroupByKey}'s behaviour in
stressful conditions.
+ * it uses {@link SyntheticBoundedIO} and {@link SyntheticStep} which both can
be parametrized to
+ * generate keys and values of various size, impose delay (sleep or cpu
burnout) in various moments
+ * during the pipeline execution and provide some other performance challenges
(see Source's and
+ * Step's documentation for more details).
+ *
+ * <p>In addition, this test allows to: - fanout: produce one input (using
Synthetic Source) and
+ * process it with multiple sessions performing the same set of operations -
reiterate produced
+ * PCollection multiple times
+ *
+ * <p>To run it manually, use the following command:
+ *
+ * <pre>
+ * ./gradlew run -p sdks/java/load-tests -PloadTest.args='
+ * --fanout=1
+ * --iterations=1
+ * --sourceOptions={"numRecords":1000,...}
+ * --stepOptions={"outputRecordsPerInputRecord":2...}'
+ * </pre>
+ */
+public class GroupByKeyLoadTest {
+
+ /** Pipeline options for the test. */
+ public interface Options extends PipelineOptions, ApplicationNameOptions {
+
+ @Description("Options for synthetic source")
+ @Validation.Required
+ String getSourceOptions();
+
+ void setSourceOptions(String sourceOptions);
+
+ @Description("Options for synthetic step")
+ String getStepOptions();
+
+ void setStepOptions(String stepOptions);
+
+ @Description("The number of GroupByKey operations to perform in parallel
(fanout)")
+ @Default.Integer(1)
+ Integer getFanout();
+
+ void setFanout(Integer fanout);
+
+ @Description("Number of reiterations over per-key-grouped values to
perform.")
+ @Default.Integer(1)
+ Integer getIterations();
+
+ void setIterations(Integer iterations);
+
+ static Options readFromArgs(String[] args) {
+ return
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ }
+
+ static <T extends SyntheticOptions> T fromJsonString(String json, Class<T>
type)
+ throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ T result = mapper.readValue(json, type);
+ result.validate();
+ return result;
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ Options options = readFromArgs(args);
+
+ SyntheticSourceOptions sourceOptions =
+ fromJsonString(options.getSourceOptions(),
SyntheticSourceOptions.class);
+
+ Optional<SyntheticStep> syntheticStep = createSyntheticStep(options);
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ PCollection<KV<byte[], byte[]>> input =
+ pipeline.apply(SyntheticBoundedIO.readFrom(sourceOptions));
+
+ for (int branch = 0; branch < options.getFanout(); branch++) {
+ applySyntheticStep(input, branch, syntheticStep)
+ .apply(format("Group by key (%s)", branch), GroupByKey.create())
+ .apply(
+ format("Ungroup and reiterate (%s)", branch),
+ ParDo.of(new UngroupAndReiterate(options.getIterations())));
+ }
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ private static PCollection<KV<byte[], byte[]>> applySyntheticStep(
+ PCollection<KV<byte[], byte[]>> input, int branch,
Optional<SyntheticStep> syntheticStep) {
+
+ if (syntheticStep.isPresent()) {
+ return input.apply(format("Synthetic step (%s)", branch),
ParDo.of(syntheticStep.get()));
+ } else {
+ return input;
+ }
+ }
+
+ private static Optional<SyntheticStep> createSyntheticStep(Options options)
throws IOException {
+ if (options.getStepOptions() != null &&
!options.getStepOptions().isEmpty()) {
+ return Optional.of(
+ new SyntheticStep(fromJsonString(options.getStepOptions(),
SyntheticStep.Options.class)));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static class UngroupAndReiterate
+ extends DoFn<KV<byte[], Iterable<byte[]>>, KV<byte[], byte[]>> {
+
+ private int iterations;
+
+ UngroupAndReiterate(int iterations) {
+ this.iterations = iterations;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ byte[] key = c.element().getKey();
+
+ // reiterate "iterations" times, emit output only once
+ for (int i = 0; i < iterations; i++) {
+ for (byte[] value : c.element().getValue()) {
+
+ if (i == iterations - 1) {
+ c.output(KV.of(key, value));
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
b/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
new file mode 100644
index 00000000000..5bf3d562098
--- /dev/null
+++
b/sdks/java/load-tests/src/main/java/org/apache/beam/sdk/loadtests/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Load test suite. */
+package org.apache.beam.sdk.loadtests;
diff --git a/settings.gradle b/settings.gradle
index 80d62500691..673c78a7761 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -160,6 +160,8 @@ include "beam-sdks-java-io-synthetic"
project(":beam-sdks-java-io-synthetic").dir = file("sdks/java/io/synthetic")
include "beam-sdks-java-javadoc"
project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
+include "beam-sdks-java-load-tests"
+project(":beam-sdks-java-load-tests").dir = file("sdks/java/load-tests")
include "beam-sdks-java-maven-archetypes-examples"
project(":beam-sdks-java-maven-archetypes-examples").dir =
file("sdks/java/maven-archetypes/examples")
include "beam-sdks-java-maven-archetypes-starter"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 151640)
Time Spent: 2h 10m (was: 2h)
> Create GroupByKey load test for Java SDK
> ----------------------------------------
>
> Key: BEAM-5355
> URL: https://issues.apache.org/jira/browse/BEAM-5355
> Project: Beam
> Issue Type: New Feature
> Components: testing
> Reporter: Lukasz Gajowy
> Assignee: Lukasz Gajowy
> Priority: Minor
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> This is more thoroughly described in this proposal:
> [https://docs.google.com/document/d/1PuIQv4v06eosKKwT76u7S6IP88AnXhTf870Rcj1AHt4/edit?usp=sharing]
>
> In short: this ticket is about implementing the GroupByKeyLoadIT that uses
> SyntheticStep and Synthetic source to create load on the pipeline.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)