[ 
https://issues.apache.org/jira/browse/BEAM-5355?focusedWorklogId=151233&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-151233
 ]

ASF GitHub Bot logged work on BEAM-5355:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Oct/18 15:38
            Start Date: 04/Oct/18 15:38
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #6361: [BEAM-5355] 
GroupByKey Load IT
URL: https://github.com/apache/beam/pull/6361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/gradle/wrapper/gradle-wrapper.properties 
b/gradle/wrapper/gradle-wrapper.properties
index 1d9f5676e81..488a0588ca2 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,22 +1,22 @@
 
################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
+#  Licensed to the Apache Software Foundation (ASF) under one  
+#  or more contributor license agreements.  See the NOTICE file        
+#  distributed with this work for additional information       
+#  regarding copyright ownership.  The ASF licenses this file  
+#  to you under the Apache License, Version 2.0 (the   
+#  "License"); you may not use this file except in compliance  
+#  with the License.  You may obtain a copy of the License at  
+#      
+#      http://www.apache.org/licenses/LICENSE-2.0      
+#      
+#  Unless required by applicable law or agreed to in writing, software 
+#  distributed under the License is distributed on an "AS IS" BASIS,   
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+#  See the License for the specific language governing permissions and 
+# limitations under the License.       
 
################################################################################
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-all.zip
diff --git a/sdks/java/io/synthetic/build.gradle 
b/sdks/java/io/synthetic/build.gradle
index 22bc6809cc9..41b66292ff2 100644
--- a/sdks/java/io/synthetic/build.gradle
+++ b/sdks/java/io/synthetic/build.gradle
@@ -24,7 +24,7 @@ ext.summary = "Generators of Synthetic IO for Testing."
 
 dependencies {
   compile library.java.joda_time
-  compile library.java.commons_math3
+  shadow library.java.commons_math3
   shadow library.java.jackson_core
   shadow library.java.jackson_annotations
   shadow library.java.jackson_databind
diff --git a/sdks/java/load-tests/OWNERS b/sdks/java/load-tests/OWNERS
new file mode 100644
index 00000000000..2a140107225
--- /dev/null
+++ b/sdks/java/load-tests/OWNERS
@@ -0,0 +1,5 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
+
+reviewers:
+  - lgajowy
+  - kkucharc
diff --git a/sdks/java/load-tests/build.gradle 
b/sdks/java/load-tests/build.gradle
new file mode 100644
index 00000000000..102b0eaf58a
--- /dev/null
+++ b/sdks/java/load-tests/build.gradle
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyJavaNature()
+
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: SDKs :: Java :: Load Tests"
+
+dependencies {
+  shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
+  shadowTest project(path: ":beam-runners-direct-java", configuration: 
"shadowTest")
+  shadowTest project(path: ":beam-sdks-java-io-synthetic", configuration: 
"shadowTest")
+  shadowTest library.java.junit
+}
+
diff --git 
a/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/GroupByKeyLoadIT.java
 
b/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/GroupByKeyLoadIT.java
new file mode 100644
index 00000000000..3df5d02628b
--- /dev/null
+++ 
b/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/GroupByKeyLoadIT.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.loadtests;
+
+import static java.lang.String.format;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO;
+import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticStep;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Load test for {@link GroupByKey} operation.
+ *
+ * <p>The purpose of this test is to measure {@link GroupByKey}'s behaviour in 
stressful conditions.
+ * it uses {@link SyntheticBoundedIO} and {@link SyntheticStep} which both can 
be parametrized to
+ * generate keys and values of various size, impose delay (sleep or cpu 
burnout) in various moments
+ * during the pipeline execution and provide some other performance challenges 
(see Source's and
+ * Step's documentation for more details).
+ *
+ * <p>In addition, this test allows to: - fanout: produce one input (using 
Synthetic Source) and
+ * process it with multiple sessions performing the same set of operations - 
reiterate produced
+ * PCollection multiple times
+ *
+ * <p>To run it manually, use the following command:
+ *
+ * <pre>
+ *    ./gradlew integrationTest -p sdks/java/load-tests/
+ *    -DintegrationTestPipelineOptions='[
+ *      "--sourceOptions=SOURCE_OPTIONS_JSON",
+ *      "--stepOptions=STEP_OPTIONS_JSON",
+ *      "--fanout=5",
+ *      "--iterations=5"]'
+ *    -DintegrationTestRunner=direct
+ *    --tests org.apache.beam.loadtests.GroupByKeyLoadIT
+ *  </pre>
+ */
+@RunWith(JUnit4.class)
+public class GroupByKeyLoadIT {
+
+  /** Pipeline options for the test. */
+  public interface Options extends TestPipelineOptions {
+
+    @Description("Options for synthetic source")
+    @Validation.Required
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("Options for synthetic step")
+    @Validation.Required
+    String getStepOptions();
+
+    void setStepOptions(String stepOptions);
+
+    @Description("The number of GroupByKey operations to perform in parallel 
(fanout)")
+    @Default.Integer(1)
+    Integer getFanout();
+
+    void setFanout(Integer fanout);
+
+    @Description("Number of reiterations over per-key-grouped values to 
perform.")
+    @Default.Integer(1)
+    Integer getIterations();
+
+    void setIterations(Integer iterations);
+  }
+
+  private static Options options;
+
+  private static SyntheticBoundedIO.SyntheticSourceOptions sourceOptions;
+
+  private static SyntheticStep.Options stepOptions;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    PipelineOptionsFactory.register(Options.class);
+    options = readPipelineOptions(Options.class);
+
+    sourceOptions =
+        fromJsonString(options.getSourceOptions(), 
SyntheticBoundedIO.SyntheticSourceOptions.class);
+
+    stepOptions = fromJsonString(options.getStepOptions(), 
SyntheticStep.Options.class);
+  }
+
+  @Test
+  public void groupByKeyLoadTest() {
+    PCollection<KV<byte[], byte[]>> input =
+        pipeline.apply(SyntheticBoundedIO.readFrom(sourceOptions));
+
+    for (int branch = 0; branch < options.getFanout(); branch++) {
+      input
+          .apply(format("Synthetic step (%s)", branch), ParDo.of(new 
SyntheticStep(stepOptions)))
+          .apply(format("Group by key (%s)", branch), GroupByKey.create())
+          .apply(
+              format("Ungroup and reiterate (%s)", branch),
+              ParDo.of(new UngroupAndReiterate(options.getIterations())));
+    }
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class UngroupAndReiterate
+      extends DoFn<KV<byte[], Iterable<byte[]>>, KV<byte[], byte[]>> {
+
+    private int iterations;
+
+    UngroupAndReiterate(int iterations) {
+      this.iterations = iterations;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      byte[] key = c.element().getKey();
+
+      // reiterate "iterations" times, emit output only once
+      for (int i = 0; i < iterations; i++) {
+        for (byte[] value : c.element().getValue()) {
+
+          if (i == iterations - 1) {
+            c.output(KV.of(key, value));
+          }
+        }
+      }
+    }
+  }
+
+  private static <T extends SyntheticOptions> T fromJsonString(String json, 
Class<T> type)
+      throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    T result = mapper.readValue(json, type);
+    result.validate();
+    return result;
+  }
+
+  private static <T extends TestPipelineOptions> T 
readPipelineOptions(Class<T> optionsType) {
+    PipelineOptionsFactory.register(optionsType);
+    TestPipelineOptions options = 
TestPipeline.testingPipelineOptions().as(optionsType);
+
+    return PipelineOptionsValidator.validate(optionsType, options);
+  }
+}
diff --git 
a/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/package-info.java
 
b/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/package-info.java
new file mode 100644
index 00000000000..4982213142e
--- /dev/null
+++ 
b/sdks/java/load-tests/src/test/java/org/apache/beam/loadtests/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Load test suite. */
+package org.apache.beam.loadtests;
diff --git a/settings.gradle b/settings.gradle
index 3ccdd4418c7..7daf6cb95b8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -162,6 +162,8 @@ include "beam-sdks-java-io-synthetic"
 project(":beam-sdks-java-io-synthetic").dir = file("sdks/java/io/synthetic")
 include "beam-sdks-java-javadoc"
 project(":beam-sdks-java-javadoc").dir = file("sdks/java/javadoc")
+include "beam-sdks-java-load-tests"
+project(":beam-sdks-java-load-tests").dir = file("sdks/java/load-tests")
 include "beam-sdks-java-maven-archetypes-examples"
 project(":beam-sdks-java-maven-archetypes-examples").dir = 
file("sdks/java/maven-archetypes/examples")
 include "beam-sdks-java-maven-archetypes-starter"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 151233)
    Time Spent: 1h 20m  (was: 1h 10m)

> Create GroupByKey load test for Java SDK
> ----------------------------------------
>
>                 Key: BEAM-5355
>                 URL: https://issues.apache.org/jira/browse/BEAM-5355
>             Project: Beam
>          Issue Type: New Feature
>          Components: testing
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Minor
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> This is more thoroughly described in this proposal: 
> [https://docs.google.com/document/d/1PuIQv4v06eosKKwT76u7S6IP88AnXhTf870Rcj1AHt4/edit?usp=sharing]
>  
> In short: this ticket is about implementing the GroupByKeyLoadIT that uses 
> SyntheticStep and Synthetic source to create load on the pipeline. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to