kamilwu commented on a change in pull request #13388: URL: https://github.com/apache/beam/pull/13388#discussion_r528580215
########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel +// * iterations (optional) - number of reiterations over per-key-grouped +// values to perform +// input_options - options for Synthetic Sources. + +// Example test run: + +// go run sdks/go/test/load/group_by_key_test/group_by_key.go \ +// --fanout=1 +// --iterations=1 +// --input_options='{ +// \"num_records\": 300, +// \"key_size\": 5, +// \"value_size\": 15, +// \"num_hot_keys\": 30, +// \"hot_key_fraction\": 0.5 +// }'" + +package main + +import ( + "context" + "flag" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic" + "github.com/apache/beam/sdks/go/pkg/beam/log" + "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/go/test/load" +) + +var ( + fanout = flag.Int( + "fanout", + 1, + "Fanout") + iterations = flag.Int( + "iterations", + 1, + "A number of subsequent ParDo transforms to be performed") Review comment: This is not true. Please change the description to: `A number of reiterations over per-key-grouped values to perform.` ########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel +// * iterations (optional) - number of reiterations over per-key-grouped +// values to perform +// input_options - options for Synthetic Sources. + +// Example test run: + +// go run sdks/go/test/load/group_by_key_test/group_by_key.go \ +// --fanout=1 +// --iterations=1 +// --input_options='{ +// \"num_records\": 300, +// \"key_size\": 5, +// \"value_size\": 15, +// \"num_hot_keys\": 30, +// \"hot_key_fraction\": 0.5 +// }'" + +package main + +import ( + "context" + "flag" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic" + "github.com/apache/beam/sdks/go/pkg/beam/log" + "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/go/test/load" +) + +var ( + fanout = flag.Int( + "fanout", + 1, + "Fanout") + iterations = flag.Int( + "iterations", + 1, + "A number of subsequent ParDo transforms to be performed") + syntheticConfig = flag.String( + "input_options", + "", + "A JSON object that describes the configuration for synthetic source") +) + +func parseSyntheticConfig() synthetic.SourceConfig { + if *syntheticConfig == "" { + panic("--input_options not provided") + } else { + encoded := []byte(*syntheticConfig) + return synthetic.DefaultSourceConfig().BuildFromJSON(encoded) + } +} + +func main() { + flag.Parse() + beam.Init() + + ctx := context.Background() + + p, s := beam.NewPipelineWithRoot() + src := synthetic.SourceSingle(s, parseSyntheticConfig()) + pcoll := beam.ParDo(s, &load.RuntimeMonitor{}, src) + for i := 0; i < *fanout; i++ { + pcoll = beam.GroupByKey(s, src) + beam.ParDo(s, func(key []uint8, values func(*[]uint8) bool) ([]uint8, []uint8) { Review comment: Could you replace `uint8` with `byte`? Those are the same types, but the ParDo test uses `byte`, and we'd like to keep it consistent. ########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel +// * iterations (optional) - number of reiterations over per-key-grouped +// values to perform +// input_options - options for Synthetic Sources. + +// Example test run: + +// go run sdks/go/test/load/group_by_key_test/group_by_key.go \ +// --fanout=1 +// --iterations=1 +// --input_options='{ +// \"num_records\": 300, +// \"key_size\": 5, +// \"value_size\": 15, +// \"num_hot_keys\": 30, +// \"hot_key_fraction\": 0.5 +// }'" + +package main + +import ( + "context" + "flag" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic" + "github.com/apache/beam/sdks/go/pkg/beam/log" + "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/go/test/load" +) + +var ( + fanout = flag.Int( + "fanout", + 1, + "Fanout") Review comment: There's no need to add `(optional)`, because, by default, every flag in Go is optional ########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel +// * iterations (optional) - number of reiterations over per-key-grouped +// values to perform +// input_options - options for Synthetic Sources. + +// Example test run: + +// go run sdks/go/test/load/group_by_key_test/group_by_key.go \ +// --fanout=1 +// --iterations=1 +// --input_options='{ +// \"num_records\": 300, +// \"key_size\": 5, +// \"value_size\": 15, +// \"num_hot_keys\": 30, +// \"hot_key_fraction\": 0.5 +// }'" + +package main + +import ( + "context" + "flag" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/io/synthetic" + "github.com/apache/beam/sdks/go/pkg/beam/log" + "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/go/test/load" +) + +var ( + fanout = flag.Int( + "fanout", + 1, + "Fanout") Review comment: Please provide a description what this flag does. For example: `A number of GroupByKey operations to perform in parallel.` ########## File path: .test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import CommonTestProperties +import LoadTestsBuilder as loadTestsBuilder +import PhraseTriggeringPostCommitBuilder +import Flink +import InfluxDBCredentialsHelper + +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + +String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) + +def batchScenarios = { + [ + [ + title : 'Load test: 2GB of 10B records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + input_options : """ + { + "num_records": 200000000, + "key_size": 1, + "value_size": 9 + } + """.trim().replaceAll("\\s", ""), + iterations : 1, + fanout : 1, + parallelism : 5, + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: 2GB of 100B records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + input_options : """ + { + "num_records": 20000000, + "key_size": 10, + "value_size": 90 + } + """.trim().replaceAll("\\s", ""), + iterations : 1, + fanout : 1, + parallelism : 5, + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: 2GB of 100kB records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + iterations : 1, + fanout : 1, + parallelism : 5, + input_options : """ + { + "num_records": 20000, + "key_size": 10000, + "value_size": 90000 + } + """.trim().replaceAll("\\s", ""), + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: fanout 4 times with 2GB 10-byte records total', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + iterations : 1, + fanout : 4, + parallelism : 16, + input_options : """ + { + "num_records": 5000000, + "key_size": 10, + "value_size": 90 + } + """.trim().replaceAll("\\s", ""), + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: fanout 8 times with 2GB 10-byte records total', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + iterations : 1, + fanout : 8, + parallelism : 16, + input_options : """ + { + "num_records": 5000000, + "key_size": 10, + "value_size": 90 + } + """.trim().replaceAll("\\s", ""), + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: reiterate 4 times 10kB values', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + iterations : 4, + fanout : 1, + parallelism : 5, + input_options : """ + { + "num_records": 5000000, + "key_size": 10, + "value_size": 90, + "num_hot_keys": 200, + "hot_key_fraction": 1 + } + """.trim().replaceAll("\\s", ""), + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: reiterate 4 times 2MB values', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + iterations : 4, + fanout : 1, + parallelism : 5, + input_options : """ + { + "num_records": 20000000, + "key_size": 10, + "value_size": 90, + "num_hot_keys": 10, + "hot_key_fraction": 1 + } + """.trim().replaceAll("\\s", ""), + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) } +} + +def loadTestJob = { scope, triggeringContext, mode -> + def numberOfWorkers = 5 Review comment: Some of your tests need the parallelism of 16, but your Flink deployment is going to have only 5 task managers. You need to group your tests by parallelism, execute the first group, then rescale the cluster and execute the second group. Please take a look at `.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy` where such a pattern is being used. ########## File path: .test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import CommonTestProperties +import LoadTestsBuilder as loadTestsBuilder +import PhraseTriggeringPostCommitBuilder +import Flink +import InfluxDBCredentialsHelper + +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + +String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) + +def batchScenarios = { + [ + [ + title : 'Load test: 2GB of 10B records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', Review comment: Please add `influx_namespace: 'flink'` for every scenario. ########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel +// * iterations (optional) - number of reiterations over per-key-grouped +// values to perform +// input_options - options for Synthetic Sources. + +// Example test run: + +// go run sdks/go/test/load/group_by_key_test/group_by_key.go \ Review comment: It's good to copy-and-paste such examples to check if everything's alright. It seems something's wrong here: `panic: Could not unmarshal SourceConfig: invalid character '\\' looking for beginning of object key string` To be honest, I have mixed feelings about such examples. They can be useful in shell scripts, where there are plenty of positional arguments. But in this case, all arguments are named, and only one of them is required. What's more, those examples can easily become outdated, because it's easy to miss them while editing the file. That happened once to Python load tests. If you decide to keep it, I guess you will have to remove backslashes to make it work. ########## File path: sdks/go/test/load/group_by_key/group_by_key.go ########## @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is GroupByKey load test with Synthetic Source. Besides of the standard +// input options there are additional options: +// * fanout (optional) - number of GBK operations to run in parallel Review comment: I think we don't need to write out every possible flag here. If someone invokes the test with `--help` flag or makes a mistake, they will see all available flags alongside their descriptions, which are defined below (starting at line 49) ########## File path: .test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import CommonTestProperties +import LoadTestsBuilder as loadTestsBuilder +import PhraseTriggeringPostCommitBuilder +import Flink +import InfluxDBCredentialsHelper + +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + +String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) + +def batchScenarios = { + [ + [ + title : 'Load test: 2GB of 10B records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + input_options : """ + { + "num_records": 200000000, + "key_size": 1, + "value_size": 9 + } + """.trim().replaceAll("\\s", ""), + iterations : 1, + fanout : 1, + parallelism : 5, + endpoint : 'localhost:8099', + environment_type : 'DOCKER', + environment_config : "${DOCKER_CONTAINER_REGISTRY}/beam_go_sdk:latest", + ] + ], + [ + title : 'Load test: 2GB of 100B records', + test : 'group_by_key', + runner : CommonTestProperties.Runner.FLINK, + pipelineOptions: [ + job_name : "load-tests-go-flink-batch-gbk-1-${now}", + influx_measurement : 'go_batch_gbk_1', + input_options : """ + { + "num_records": 20000000, + "key_size": 10, + "value_size": 90 + } + """.trim().replaceAll("\\s", ""), + iterations : 1, + fanout : 1, + parallelism : 5, + endpoint : 'localhost:8099', Review comment: Please remove one extra space so that colons are in line. The same applies to other scenarios as well. ########## File path: .test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import CommonTestProperties +import LoadTestsBuilder as loadTestsBuilder +import PhraseTriggeringPostCommitBuilder +import Flink +import InfluxDBCredentialsHelper + +import static LoadTestsBuilder.DOCKER_CONTAINER_REGISTRY + +String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) + +def batchScenarios = { + [ + [ + title : 'Load test: 2GB of 10B records', Review comment: `Load test: 2GB of 10B records` -> `GroupByKey Go Load test: 2GB of 10B records` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
