[
https://issues.apache.org/jira/browse/BEAM-7872?focusedWorklogId=288860&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-288860
]
ASF GitHub Bot logged work on BEAM-7872:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Aug/19 11:10
Start Date: 05/Aug/19 11:10
Worklog Time Spent: 10m
Work Description: lgajowy commented on pull request #9213: [BEAM-7872]
Simpler Flink cluster set up in load tests
URL: https://github.com/apache/beam/pull/9213#discussion_r310542007
##########
File path: .test-infra/jenkins/Flink.groovy
##########
@@ -19,10 +19,74 @@
import CommonJobProperties as common
import CommonTestProperties.SDK
-class Infrastructure {
+class Flink {
+ private static final String repositoryRoot =
'gcr.io/apache-beam-testing/beam_portability'
+ private static final String dockerTag = 'latest'
+ private static final String jobServerImageTag =
"${repositoryRoot}/flink-job-server:${dockerTag}"
+ private static final String flinkVersion = '1.7'
+ private static final String flinkDownloadUrl =
'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'
+
+ private static def job
+ private static String jobName
+
+ /**
+ * Returns SDK Harness image tag to be used as an environment_config in the
job definition.
+ *
+ * @param sdk - SDK
+ */
+ static String getSDKHarnessImageTag(SDK sdk) {
+ switch (sdk) {
+ case CommonTestProperties.SDK.PYTHON:
+ return "${repositoryRoot}/python:${dockerTag}"
+ case CommonTestProperties.SDK.JAVA:
+ return "${repositoryRoot}/java:${dockerTag}"
+ default:
+ String sdkName = sdk.name().toLowerCase()
+ throw new IllegalArgumentException("${sdkName} SDK is not supported")
+ }
+ }
+
+ /**
+ * Creates Flink cluster and specifies cleanup steps.
+ *
+ * @param job - jenkins job
+ * @param jobName - string to be used as a base for cluster name
+ * @param sdk - SDK
+ * @param workerCount - the initial number of worker nodes excluding one
extra node for Flink's Job Manager
+ * @param slotsPerTaskmanager - the number of slots per Flink task manager
+ */
+ static Flink setUp(job, String jobName, SDK sdk, Integer workerCount,
Integer slotsPerTaskmanager = 1) {
+ Flink flink = new Flink(job, jobName)
+
+ flink.prepareSDKHarness(sdk)
+ flink.prepareFlinkJobServer()
+ flink.setupFlinkCluster(sdk, workerCount, slotsPerTaskmanager)
+ flink.addTeardownDataprocCleanupStep()
+
+ return flink
+ }
+
+ /**
+ * Updates the number of worker nodes in a cluster.
+ *
+ * @param workerCount - the new number of worker nodes in the cluster
excluding one extra node for Flink's Job Manager
+ */
+ void scaleCluster(Integer workerCount) {
+ job.steps {
+ // Keep one extra Dataproc VM for Flink's Job Manager
+ workerCount += 1
+ shell("echo Changing number of workers to ${workerCount}")
+ shell("gcloud dataproc clusters update ${getClusterName()}
--num-workers=${workerCount} --quiet")
+ }
+ }
+
+ private Flink(def job, String jobName) {
Review comment:
If I'm not mistaken (my knowledge can be a little bit dusty here), the
constructor should be above public methods. Could you move it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 288860)
Time Spent: 2h 20m (was: 2h 10m)
> Simpler Flink cluster set up in load tests
> ------------------------------------------
>
> Key: BEAM-7872
> URL: https://issues.apache.org/jira/browse/BEAM-7872
> Project: Beam
> Issue Type: Sub-task
> Components: testing
> Reporter: Kamil Wasilewski
> Assignee: Kamil Wasilewski
> Priority: Major
> Time Spent: 2h 20m
> Remaining Estimate: 0h
>
> Creating a new load test running on Flink runner could be easier by providing
> a single `setUp` function which would encapsulate the process of creating
> Flink cluster and registering teardown steps
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)