This is an automated email from the ASF dual-hosted git repository.
kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e9380df [BEAM-11075] CoGBK tests for Go SDK (#13438)
e9380df is described below
commit e9380df4c7474d3fdf3251b7ea80106bbf55fea5
Author: Kamil Wasilewski <[email protected]>
AuthorDate: Mon Dec 7 12:37:20 2020 +0100
[BEAM-11075] CoGBK tests for Go SDK (#13438)
---
.../jenkins/job_LoadTests_coGBK_Flink_Go.groovy | 182 +++++++++++++++++++++
sdks/go/test/load/build.gradle | 1 +
sdks/go/test/load/cogbk/cogbk.go | 106 ++++++++++++
3 files changed, 289 insertions(+)
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
new file mode 100644
index 0000000..d5463b0
--- /dev/null
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import CommonTestProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import Flink
+import InfluxDBCredentialsHelper
+
+import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY
+
+String now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+// TODO(BEAM-11398): Skipping the first test because it is too slow.
+def TESTS_TO_SKIP = [
+ 'load-tests-go-flink-batch-cogbk-1-',
+]
+
+def batchScenarios = {
+ [
+ [
+ title : 'CoGroupByKey Go Load test: 2GB of 100B records with a
single key',
+ test : 'cogbk',
+ runner : CommonTestProperties.Runner.FLINK,
+ pipelineOptions: [
+ job_name : "load-tests-go-flink-batch-cogbk-1-${now}",
+ influx_measurement : 'go_batch_cogbk_1',
+ influx_namespace : 'flink',
+ input_options : '\'{' +
+ '"num_records": 20000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1,' +
+ '"hot_key_fraction": 1}\'',
+ co_input_options : '\'{' +
+ '"num_records": 2000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1000,' +
+ '"hot_key_fraction": 1}\'',
+ iterations : 1,
+ parallelism : 5,
+ endpoint : 'localhost:8099',
+ environment_type : 'DOCKER',
+ environment_config :
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+ ]
+ ],
+ [
+ title : 'CoGroupByKey Go Load test: 2GB of 100B records with
multiple keys',
+ test : 'cogbk',
+ runner : CommonTestProperties.Runner.FLINK,
+ pipelineOptions: [
+ job_name : "load-tests-go-flink-batch-cogbk-2-${now}",
+ influx_measurement : 'go_batch_cogbk_2',
+ influx_namespace : 'flink',
+ input_options : '\'{' +
+ '"num_records": 20000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 5,' +
+ '"hot_key_fraction": 1}\'',
+ co_input_options : '\'{' +
+ '"num_records": 2000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1000,' +
+ '"hot_key_fraction": 1}\'',
+ iterations : 1,
+ parallelism : 5,
+ endpoint : 'localhost:8099',
+ environment_type : 'DOCKER',
+ environment_config :
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+ ]
+ ],
+ [
+ title : 'CoGroupByKey Go Load test: reiterate 4 times 10kB
values',
+ test : 'cogbk',
+ runner : CommonTestProperties.Runner.FLINK,
+ pipelineOptions: [
+ job_name : "load-tests-go-flink-batch-cogbk-3-${now}",
+ influx_measurement : 'go_batch_cogbk_3',
+ influx_namespace : 'flink',
+ input_options : '\'{' +
+ '"num_records": 20000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 200000,' +
+ '"hot_key_fraction": 1}\'',
+ co_input_options : '\'{' +
+ '"num_records": 2000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1000,' +
+ '"hot_key_fraction": 1}\'',
+ iterations : 4,
+ parallelism : 5,
+ endpoint : 'localhost:8099',
+ environment_type : 'DOCKER',
+ environment_config :
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+ ]
+ ],
+ [
+ title : 'CoGroupByKey Go Load test: reiterate 4 times 2MB
values',
+ test : 'cogbk',
+ runner : CommonTestProperties.Runner.FLINK,
+ pipelineOptions: [
+ job_name : "load-tests-go-flink-batch-cogbk-4-${now}",
+ influx_measurement : 'go_batch_cogbk_4',
+ influx_namespace : 'flink',
+ input_options : '\'{' +
+ '"num_records": 20000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1000,' +
+ '"hot_key_fraction": 1}\'',
+ co_input_options : '\'{' +
+ '"num_records": 2000000,' +
+ '"key_size": 10,' +
+ '"value_size": 90,' +
+ '"num_hot_keys": 1000,' +
+ '"hot_key_fraction": 1}\'',
+ iterations : 4,
+ parallelism : 5,
+ endpoint : 'localhost:8099',
+ environment_type : 'DOCKER',
+ environment_config :
"${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest",
+ ]
+ ],
+ ]
+ .each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) }
+ .collectMany { test ->
+ TESTS_TO_SKIP.any { element ->
test.pipelineOptions.job_name.startsWith(element) } ? []: [test]
+ }
+}
+
+def loadTestJob = { scope, triggeringContext, mode ->
+ def numberOfWorkers = 5
+
+ def flink = new Flink(scope,
"beam_LoadTests_Go_CoGBK_Flink_${mode.capitalize()}")
+ flink.setUp(
+ [
+ "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest"
+ ],
+ numberOfWorkers,
+ "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.10_job_server:latest")
+
+ loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
batchScenarios(), 'CoGBK', mode)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+ 'beam_LoadTests_Go_CoGBK_Flink_Batch',
+ 'Run Load Tests Go CoGBK Flink Batch',
+ 'Load Tests Go CoGBK Flink Batch suite',
+ this
+ ) {
+ additionalPipelineArgs = [:]
+ loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch')
+ }
+
+CronJobBuilder.cronJob('beam_LoadTests_Go_CoGBK_Flink_batch', 'H 8 * * *',
this) {
+ additionalPipelineArgs = [
+ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ ]
+ loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT,
'batch')
+}
diff --git a/sdks/go/test/load/build.gradle b/sdks/go/test/load/build.gradle
index deae8e4..83160e8 100644
--- a/sdks/go/test/load/build.gradle
+++ b/sdks/go/test/load/build.gradle
@@ -50,6 +50,7 @@ golang {
go 'build -o ./build/bin/${GOOS}_${GOARCH}/pardo
github.com/apache/beam/sdks/go/test/load/pardo'
go 'build -o ./build/bin/${GOOS}_${GOARCH}/combine
github.com/apache/beam/sdks/go/test/load/combine'
go 'build -o ./build/bin/${GOOS}_${GOARCH}/group_by_key
github.com/apache/beam/sdks/go/test/load/group_by_key'
+ go 'build -o ./build/bin/${GOOS}_${GOARCH}/cogbk
github.com/apache/beam/sdks/go/test/load/cogbk'
}
}
diff --git a/sdks/go/test/load/cogbk/cogbk.go b/sdks/go/test/load/cogbk/cogbk.go
new file mode 100644
index 0000000..a3de69b
--- /dev/null
+++ b/sdks/go/test/load/cogbk/cogbk.go
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "flag"
+ "reflect"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic"
+ "github.com/apache/beam/sdks/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
+ "github.com/apache/beam/sdks/go/test/load"
+)
+
+var (
+ iterations = flag.Int(
+ "iterations",
+ 1,
+ "A number of reiterations over per-key-grouped values to be
performed.")
+ syntheticConfig = flag.String(
+ "input_options",
+ "",
+ "A JSON object that describes the configuration for the first
synthetic source.")
+ coSyntheticConfig = flag.String(
+ "co_input_options",
+ "",
+ "A JSON object that describes the configuration for the second
synthetic source.")
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem())
+}
+
+// ungroupAndReiterateFn reiterates given number of times over CoGBK's output.
+type ungroupAndReiterateFn struct {
+ Iterations int
+}
+
+func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values
func(*[]byte) bool, emit func([]byte, []byte)) {
+ var value []byte
+ for i := 0; i < fn.Iterations; i++ {
+ for p1values(&value) {
+ // emit output only once
+ if i == fn.Iterations-1 {
+ emit(key, value)
+ }
+ }
+ for p2values(&value) {
+ if i == fn.Iterations-1 {
+ emit(key, value)
+ }
+ }
+ }
+}
+
+func parseSyntheticConfig(config string) synthetic.SourceConfig {
+ if config == "" {
+ panic("--input_options and --co_input_options not provided")
+ } else {
+ encoded := []byte(config)
+ return synthetic.DefaultSourceConfig().BuildFromJSON(encoded)
+ }
+}
+
+func main() {
+ flag.Parse()
+ beam.Init()
+ ctx := context.Background()
+
+ p, s := beam.NewPipelineWithRoot()
+
+ src1 := synthetic.SourceSingle(s,
parseSyntheticConfig(*syntheticConfig))
+ pc1 := beam.ParDo(s, &load.RuntimeMonitor{}, src1)
+
+ src2 := synthetic.SourceSingle(s,
parseSyntheticConfig(*coSyntheticConfig))
+ pc2 := beam.ParDo(s, &load.RuntimeMonitor{}, src2)
+
+ joined := beam.CoGroupByKey(s, pc1, pc2)
+ pc := beam.ParDo(s, &ungroupAndReiterateFn{Iterations: *iterations},
joined)
+ beam.ParDo(s, &load.RuntimeMonitor{}, pc)
+
+ presult, err := beamx.RunWithMetrics(ctx, p)
+ if err != nil {
+ log.Fatalf(ctx, "Failed to execute job: %v", err)
+ }
+
+ if presult != nil {
+ metrics := presult.Metrics().AllMetrics()
+ load.PublishMetrics(metrics)
+ }
+}