This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d001a69e1a5 Test Dataproc 2.1 with Flink load tests (#24129)
d001a69e1a5 is described below
commit d001a69e1a58701d6ed4fcb5e3fb7a0921301dad
Author: Yi Hu <[email protected]>
AuthorDate: Mon Nov 14 10:56:54 2022 -0500
Test Dataproc 2.1 with Flink load tests (#24129)
* Test Dataproc 2.1 with Flink load tests
* Minor fix flink_cluster script
---
.test-infra/dataproc/flink_cluster.sh | 20 +++++++++++---------
.test-infra/jenkins/CommonTestProperties.groovy | 2 +-
.test-infra/jenkins/Flink.groovy | 2 +-
.../jenkins/job_LoadTests_Combine_Flink_Go.groovy | 2 +-
.../job_LoadTests_Combine_Flink_Python.groovy | 2 +-
.../jenkins/job_LoadTests_GBK_Flink_Go.groovy | 2 +-
.../jenkins/job_LoadTests_GBK_Flink_Python.groovy | 2 +-
.../jenkins/job_LoadTests_ParDo_Flink_Go.groovy | 2 +-
.../jenkins/job_LoadTests_ParDo_Flink_Python.groovy | 2 +-
.../jenkins/job_LoadTests_SideInput_Flink_Go.groovy | 2 +-
.../jenkins/job_LoadTests_coGBK_Flink_Go.groovy | 2 +-
.../jenkins/job_LoadTests_coGBK_Flink_Python.groovy | 2 +-
...stCommit_Python_Chicago_Taxi_Example_Flink.groovy | 2 +-
13 files changed, 23 insertions(+), 21 deletions(-)
diff --git a/.test-infra/dataproc/flink_cluster.sh
b/.test-infra/dataproc/flink_cluster.sh
index 6dbf2724c71..645cc3c081f 100755
--- a/.test-infra/dataproc/flink_cluster.sh
+++ b/.test-infra/dataproc/flink_cluster.sh
@@ -17,7 +17,7 @@
# Provide the following environment to run this script:
#
# GCLOUD_ZONE: Google cloud zone. Optional. Default: "us-central1-a"
-# DATAPROC_VERSION: Dataproc version. Optional. Default: 1.5
+# DATAPROC_VERSION: Dataproc version. Optional. Default: 2.1
# CLUSTER_NAME: Cluster name
# GCS_BUCKET: GCS bucket url for Dataproc resources (init actions)
# HARNESS_IMAGES_TO_PULL: Urls to SDK Harness' images to pull on dataproc
workers (optional: 0, 1 or multiple urls for every harness image)
@@ -46,8 +46,9 @@ set -Eeuxo pipefail
# GCloud properties
GCLOUD_ZONE="${GCLOUD_ZONE:=us-central1-a}"
-DATAPROC_VERSION="${DATAPROC_VERSION:=2.0}"
-GCLOUD_REGION="us-central1"
+# TODO: replace preview once dataproc 2.1 released
+DATAPROC_VERSION="${DATAPROC_VERSION:=preview-debian11}"
+GCLOUD_REGION=`echo $GCLOUD_ZONE | sed -E "s/(-[a-z])?$//"`
MASTER_NAME="$CLUSTER_NAME-m"
@@ -79,9 +80,9 @@ function get_leader() {
while read line; do
echo $line
application_ids[$i]=`echo $line | sed "s/ .*//"`
- application_masters[$i]=`echo $line | sed
"s/.*$CLUSTER_NAME/$CLUSTER_NAME/" | sed "s/ .*//"`
+ application_masters[$i]=`echo $line | sed -E "s#.*(https?://)##" | sed "s/
.*//"`
i=$((i+1))
- done <<< $(gcloud compute ssh --zone=$GCLOUD_ZONE --quiet yarn@$MASTER_NAME
--command="yarn application -list" | grep "$CLUSTER_NAME")
+ done <<< $(gcloud compute ssh --zone=$GCLOUD_ZONE --quiet yarn@$MASTER_NAME
--command="yarn application -list" | grep "Apache Flink")
if [ $i != 1 ]; then
echo "Multiple applications found. Make sure that only 1 application is
running on the cluster."
@@ -129,12 +130,13 @@ function create_cluster() {
local image_version=$DATAPROC_VERSION
echo "Starting dataproc cluster. Dataproc version: $image_version"
- # Create one extra Dataproc VM for Flink's Job Manager
- local num_dataproc_workers="$(($FLINK_NUM_WORKERS + 1))"
-
# Docker init action restarts yarn so we need to start yarn session after
this restart happens.
# This is why flink init action is invoked last.
- gcloud dataproc clusters create $CLUSTER_NAME --region=$GCLOUD_REGION
--num-workers=$num_dataproc_workers --metadata "${metadata}",
--image-version=$image_version --zone=$GCLOUD_ZONE
--optional-components=FLINK,DOCKER --quiet
+ # TODO(11/11/2022) remove --worker-machine-type and --master-machine-type
once N2 CPUs quota relaxed
+ # Dataproc 2.1 uses n2-standard-2 by default but there is N2 CPUs=24 quota
limit
+ gcloud dataproc clusters create $CLUSTER_NAME --region=$GCLOUD_REGION
--num-workers=$FLINK_NUM_WORKERS \
+ --master-machine-type=n1-standard-2 --worker-machine-type=n1-standard-2
--metadata "${metadata}", \
+ --image-version=$image_version --zone=$GCLOUD_ZONE
--optional-components=FLINK,DOCKER --quiet
}
# Runs init actions for Docker, Portability framework (Beam) and Flink cluster
diff --git a/.test-infra/jenkins/CommonTestProperties.groovy
b/.test-infra/jenkins/CommonTestProperties.groovy
index 3e23599db59..c6870dea59a 100644
--- a/.test-infra/jenkins/CommonTestProperties.groovy
+++ b/.test-infra/jenkins/CommonTestProperties.groovy
@@ -26,7 +26,7 @@ class CommonTestProperties {
}
static String getFlinkVersion() {
- return "1.13"
+ return "1.15"
}
static String getSparkVersion() {
diff --git a/.test-infra/jenkins/Flink.groovy b/.test-infra/jenkins/Flink.groovy
index 4aadf6943ed..34f3b60709c 100644
--- a/.test-infra/jenkins/Flink.groovy
+++ b/.test-infra/jenkins/Flink.groovy
@@ -17,7 +17,7 @@
*/
class Flink {
- private static final String flinkDownloadUrl =
'https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz'
+ private static final String flinkDownloadUrl =
'https://archive.apache.org/dist/flink/flink-1.15.0/flink-1.15.0-bin-scala_2.12.tgz'
private static final String hadoopDownloadUrl =
'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar'
private static final String FLINK_DIR =
'"$WORKSPACE/src/.test-infra/dataproc"'
private static final String FLINK_SCRIPT = 'flink_cluster.sh'
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
index b61aedea376..db573808c64 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Go.groovy
@@ -107,7 +107,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
initialScenarios, 'combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
index b88a3fafc2d..8f6ff06410f 100644
--- a/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
@@ -132,7 +132,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON,
initialScenarios, 'Combine', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
index 49116659acb..3a5e63b283a 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Go.groovy
@@ -199,7 +199,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
initialParallelism,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
// Execute all scenarios connected with initial parallelism.
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
initialScenarios, 'group_by_key', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
index ade6bc16a69..1c19b2a4760 100644
--- a/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
@@ -146,7 +146,7 @@ def loadTest = { scope, triggeringContext ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
def configurations = testScenarios.findAll {
it.pipelineOptions?.parallelism?.value == numberOfWorkers }
loadTestsBuilder.loadTests(scope, sdk, configurations, "GBK", "batch")
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
index c7f3fa1245d..0eb7e2b2ce2 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Go.groovy
@@ -127,7 +127,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
batchScenarios(), 'ParDo', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
index d07964d0d44..96172d94b9b 100644
--- a/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_ParDo_Flink_Python.groovy
@@ -320,7 +320,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON,
testScenarios, 'ParDo', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
index 185c29a16e8..b619d2eb6bd 100644
--- a/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy
@@ -79,7 +79,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
batchScenarios(), 'SideInput', mode)
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
index f122796c4d8..4cb77e43368 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Go.groovy
@@ -159,7 +159,7 @@ def loadTestJob = { scope, triggeringContext, mode ->
GO_SDK_CONTAINER
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.GO,
batchScenarios(), 'CoGBK', mode)
}
diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
index e1bb58cbdc8..a0a06ea4f14 100644
--- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
+++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy
@@ -137,7 +137,7 @@ def loadTest = { scope, triggeringContext ->
"${DOCKER_CONTAINER_REGISTRY}/${DOCKER_BEAM_SDK_IMAGE}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON,
testScenarios, 'CoGBK', 'batch')
}
diff --git
a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
index 516bf028714..724bc45eb3f 100644
---
a/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
+++
b/.test-infra/jenkins/job_PostCommit_Python_Chicago_Taxi_Example_Flink.groovy
@@ -38,7 +38,7 @@ def chicagoTaxiJob = { scope ->
"${DOCKER_CONTAINER_REGISTRY}/${beamSdkDockerImage}"
],
numberOfWorkers,
- "${DOCKER_CONTAINER_REGISTRY}/beam_flink1.13_job_server:latest")
+
"${DOCKER_CONTAINER_REGISTRY}/beam_flink${CommonTestProperties.getFlinkVersion()}_job_server:latest")
def pipelineOptions = [
parallelism : numberOfWorkers,