This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9380df  [BEAM-11075] CoGBK tests for Go SDK (#13438)
e9380df is described below

commit e9380df4c7474d3fdf3251b7ea80106bbf55fea5
Author: Kamil Wasilewski <[email protected]>
AuthorDate: Mon Dec 7 12:37:20 2020 +0100

    [BEAM-11075] CoGBK tests for Go SDK (#13438)
---
 .../jenkins/job_LoadTests_coGBK_Flink_Go.groovy    | 182 +++++++++++++++++++++
 sdks/go/test/load/build.gradle                     |   1 +
 sdks/go/test/load/cogbk/cogbk.go                   | 106 ++++++++++++
 3 files changed, 289 insertions(+)

diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy 
b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
new file mode 100644
index 0000000..d5463b0
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import CommonTestProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import Flink
+import InfluxDBCredentialsHelper
+
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
+String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+// TODO(BEAM-11398): Skipping the first test because it is too slow.
+def TESTS_TO_SKIP = [
+  'load-tests-go-flink-batch-cogbk-1-',
+]
+
+def batchScenarios = {
+  [
+    [
+      title          : 'CoGroupByKey Go Load test: 2GB of 100B records with a 
single key',
+      test           : 'cogbk',
+      runner         : CommonTestProperties.Runner.FLINK,
+      pipelineOptions: [
+        job_name             : "load-tests-go-flink-batch-cogbk-1-${now}",
+        influx_measurement   : 'go_batch_cogbk_1',
+        influx_namespace     : 'flink',
+        input_options        : '\'{' +
+        '"num_records": 20000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1,' +
+        '"hot_key_fraction": 1}\'',
+        co_input_options      : '\'{' +
+        '"num_records": 2000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1000,' +
+        '"hot_key_fraction": 1}\'',
+        iterations           : 1,
+        parallelism          : 5,
+        endpoint             : 'localhost:8099',
+        environment_type     : 'DOCKER',
+        environment_config   : 
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+      ]
+    ],
+    [
+      title          : 'CoGroupByKey Go Load test: 2GB of 100B records with 
multiple keys',
+      test           : 'cogbk',
+      runner         : CommonTestProperties.Runner.FLINK,
+      pipelineOptions: [
+        job_name             : "load-tests-go-flink-batch-cogbk-2-${now}",
+        influx_measurement   : 'go_batch_cogbk_2',
+        influx_namespace     : 'flink',
+        input_options        : '\'{' +
+        '"num_records": 20000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 5,' +
+        '"hot_key_fraction": 1}\'',
+        co_input_options      : '\'{' +
+        '"num_records": 2000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1000,' +
+        '"hot_key_fraction": 1}\'',
+        iterations           : 1,
+        parallelism          : 5,
+        endpoint             : 'localhost:8099',
+        environment_type     : 'DOCKER',
+        environment_config   : 
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+      ]
+    ],
+    [
+      title          : 'CoGroupByKey Go Load test: reiterate 4 times 10kB 
values',
+      test           : 'cogbk',
+      runner         : CommonTestProperties.Runner.FLINK,
+      pipelineOptions: [
+        job_name             : "load-tests-go-flink-batch-cogbk-3-${now}",
+        influx_measurement   : 'go_batch_cogbk_3',
+        influx_namespace     : 'flink',
+        input_options        : '\'{' +
+        '"num_records": 20000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 200000,' +
+        '"hot_key_fraction": 1}\'',
+        co_input_options      : '\'{' +
+        '"num_records": 2000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1000,' +
+        '"hot_key_fraction": 1}\'',
+        iterations           : 4,
+        parallelism          : 5,
+        endpoint             : 'localhost:8099',
+        environment_type     : 'DOCKER',
+        environment_config   : 
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+      ]
+    ],
+    [
+      title          : 'CoGroupByKey Go Load test: reiterate 4 times 2MB 
values',
+      test           : 'cogbk',
+      runner         : CommonTestProperties.Runner.FLINK,
+      pipelineOptions: [
+        job_name             : "load-tests-go-flink-batch-cogbk-4-${now}",
+        influx_measurement   : 'go_batch_cogbk_4',
+        influx_namespace     : 'flink',
+        input_options        : '\'{' +
+        '"num_records": 20000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1000,' +
+        '"hot_key_fraction": 1}\'',
+        co_input_options      : '\'{' +
+        '"num_records": 2000000,' +
+        '"key_size": 10,' +
+        '"value_size": 90,' +
+        '"num_hot_keys": 1000,' +
+        '"hot_key_fraction": 1}\'',
+        iterations           : 4,
+        parallelism          : 5,
+        endpoint             : 'localhost:8099',
+        environment_type     : 'DOCKER',
+        environment_config   : 
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+      ]
+    ],
+  ]
+  .each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
+  .collectMany { test ->
+    TESTS_TO_SKIP.any { element -> 
test.pipelineOptions.job_name.startsWith(element) } ? []: [test]
+  }
+}
+
+def loadTestJob = { scope, triggeringContext, mode ->
+  def numberOfWorkers = 5
+
+  def flink = new Flink(scope, 
"beam_LoadTests_Go_CoGBK_Flink_${mode.capitalize()}")
+  flink.setUp(
+      [
+        "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest"
+      ],
+      numberOfWorkers,
+      "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest")
+
+  loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO, 
batchScenarios(), 'CoGBK', mode)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+    'beam_LoadTests_Go_CoGBK_Flink_Batch',
+    'Run Load Tests Go CoGBK Flink Batch',
+    'Load Tests Go CoGBK Flink Batch suite',
+    this
+    ) {
+      additionalPipelineArgs = [:]
+      loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
+    }
+
+CronJobBuilder.cronJob('beam_LoadTests_Go_CoGBK_Flink_batch', 'H 8 * * *', 
this) {
+  additionalPipelineArgs = [
+    influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+  ]
+  loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 
'batch')
+}
diff --git a/sdks/go/test/load/build.gradle b/sdks/go/test/load/build.gradle
index deae8e4..83160e8 100644
--- a/sdks/go/test/load/build.gradle
+++ b/sdks/go/test/load/build.gradle
@@ -50,6 +50,7 @@ golang {
     go 'build -o ./build/bin/${GOOS}_${GOARCH}/pardo 
github.com/apache/beam/sdks/go/test/load/pardo'
     go 'build -o ./build/bin/${GOOS}_${GOARCH}/combine 
github.com/apache/beam/sdks/go/test/load/combine'
     go 'build -o ./build/bin/${GOOS}_${GOARCH}/group_by_key 
github.com/apache/beam/sdks/go/test/load/group_by_key'
+    go 'build -o ./build/bin/${GOOS}_${GOARCH}/cogbk 
github.com/apache/beam/sdks/go/test/load/cogbk'
   }
 }
 
diff --git a/sdks/go/test/load/cogbk/cogbk.go b/sdks/go/test/load/cogbk/cogbk.go
new file mode 100644
index 0000000..a3de69b
--- /dev/null
+++ b/sdks/go/test/load/cogbk/cogbk.go
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "context"
+       "flag"
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/go/test/load"
+)
+
+var (
+       iterations = flag.Int(
+               "iterations",
+               1,
+               "A number of reiterations over per-key-grouped values to be 
performed.")
+       syntheticConfig = flag.String(
+               "input_options",
+               "",
+               "A JSON object that describes the configuration for the first 
synthetic source.")
+       coSyntheticConfig = flag.String(
+               "co_input_options",
+               "",
+               "A JSON object that describes the configuration for the second 
synthetic source.")
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem())
+}
+
+// ungroupAndReiterateFn reiterates given number of times over CoGBK's output.
+type ungroupAndReiterateFn struct {
+       Iterations int
+}
+
+func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values 
func(*[]byte) bool, emit func([]byte, []byte)) {
+       var value []byte
+       for i := 0; i < fn.Iterations; i++ {
+               for p1values(&value) {
+                       // emit output only once
+                       if i == fn.Iterations-1 {
+                               emit(key, value)
+                       }
+               }
+               for p2values(&value) {
+                       if i == fn.Iterations-1 {
+                               emit(key, value)
+                       }
+               }
+       }
+}
+
+func parseSyntheticConfig(config string) synthetic.SourceConfig {
+       if config == "" {
+               panic("--input_options and --co_input_options not provided")
+       } else {
+               encoded := []byte(config)
+               return synthetic.DefaultSourceConfig().BuildFromJSON(encoded)
+       }
+}
+
+func main() {
+       flag.Parse()
+       beam.Init()
+       ctx := context.Background()
+
+       p, s := beam.NewPipelineWithRoot()
+
+       src1 := synthetic.SourceSingle(s, 
parseSyntheticConfig(*syntheticConfig))
+       pc1 := beam.ParDo(s, &load.RuntimeMonitor{}, src1)
+
+       src2 := synthetic.SourceSingle(s, 
parseSyntheticConfig(*coSyntheticConfig))
+       pc2 := beam.ParDo(s, &load.RuntimeMonitor{}, src2)
+
+       joined := beam.CoGroupByKey(s, pc1, pc2)
+       pc := beam.ParDo(s, &ungroupAndReiterateFn{Iterations: *iterations}, 
joined)
+       beam.ParDo(s, &load.RuntimeMonitor{}, pc)
+
+       presult, err := beamx.RunWithMetrics(ctx, p)
+       if err != nil {
+               log.Fatalf(ctx, "Failed to execute job: %v", err)
+       }
+
+       if presult != nil {
+               metrics := presult.Metrics().AllMetrics()
+               load.PublishMetrics(metrics)
+       }
+}

Reply via email to