[
https://issues.apache.org/jira/browse/BEAM-5467?focusedWorklogId=156206&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-156206
]
ASF GitHub Bot logged work on BEAM-5467:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Oct/18 01:36
Start Date: 19/Oct/18 01:36
Worklog Time Spent: 10m
Work Description: tweise closed pull request #6532: [BEAM-5467] Use
process SDKHarness to run flink PVR tests.
URL: https://github.com/apache/beam/pull/6532
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Flink.groovy
b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Flink.groovy
index 0d9e661efed..f6c42157047 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Flink.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Flink.groovy
@@ -33,8 +33,7 @@
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_VR_Flink',
steps {
gradle {
rootBuildScriptDir(commonJobProperties.checkoutDir)
- tasks(':beam-sdks-python:flinkCompatibilityMatrixBatch')
- tasks(':beam-sdks-python:flinkCompatibilityMatrixStreaming')
+ tasks(':beam-sdks-python:flinkValidatesRunner')
commonJobProperties.setGradleSwitches(delegate)
}
}
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 044e0e9a913..b321e73e250 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -17,6 +17,7 @@
from __future__ import absolute_import
from __future__ import print_function
+import argparse
import logging
import shutil
import sys
@@ -25,6 +26,7 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability import portable_runner_test
@@ -34,10 +36,27 @@
# Run as
#
# python -m apache_beam.runners.portability.flink_runner_test \
- # /path/to/job_server.jar \
+ # --flink_job_server_jar=/path/to/job_server.jar \
+ # --type=Batch \
+ # --environment_type=docker \
# [FlinkRunnerTest.test_method, ...]
- flinkJobServerJar = sys.argv.pop(1)
- streaming = sys.argv.pop(1).lower() == 'streaming'
+
+ parser = argparse.ArgumentParser(add_help=True)
+ parser.add_argument('--flink_job_server_jar',
+ help='Job server jar to submit jobs.')
+ parser.add_argument('--streaming', default=False, action='store_true',
+ help='Job type. batch or streaming')
+ parser.add_argument('--environment_type', default='docker',
+ help='Environment type. docker or process')
+ parser.add_argument('--environment_config', help='Environment config.')
+ known_args, args = parser.parse_known_args(sys.argv)
+ sys.argv = args
+
+ flink_job_server_jar = known_args.flink_job_server_jar
+ streaming = known_args.streaming
+ environment_type = known_args.environment_type.lower()
+ environment_config = (
+ known_args.environment_config if known_args.environment_config else None)
# This is defined here to only be run when we invoke this file explicitly.
class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
@@ -50,10 +69,11 @@ def _subprocess_command(cls, port):
try:
return [
'java',
- '-jar', flinkJobServerJar,
+ '-jar', flink_job_server_jar,
'--artifacts-dir', tmp_dir,
'--job-host', 'localhost',
'--job-port', str(port),
+ '--artifact-port', '0',
]
finally:
shutil.rmtree(tmp_dir)
@@ -65,6 +85,13 @@ def get_runner(cls):
def create_options(self):
options = super(FlinkRunnerTest, self).create_options()
options.view_as(DebugOptions).experiments = ['beam_fn_api']
+ # Default environment is Docker.
+ if environment_type == 'process':
+ options.view_as(PortableOptions).environment_type = 'PROCESS'
+
+ if environment_config:
+ options.view_as(PortableOptions).environment_config =
environment_config
+
if streaming:
options.view_as(StandardOptions).streaming = True
return options
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index a4ca68219da..4fdde88a814 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -340,24 +340,51 @@ task hdfsIntegrationTest(dependsOn: 'installGcpTest') {
}
}
+class CompatibilityMatrixConfig {
+ // Execute batch or streaming pipelines.
+ boolean streaming = false
+ // Execute on Docker or Process based environment.
+ SDK_WORKER_TYPE workerType = SDK_WORKER_TYPE.DOCKER
+
+ enum SDK_WORKER_TYPE {
+ DOCKER, PROCESS
+ }
+}
+
def flinkCompatibilityMatrix = {
- def type = it
- def name = 'flinkCompatibilityMatrix' + type
+ def config = it ? it as CompatibilityMatrixConfig : new
CompatibilityMatrixConfig()
+ def workerType = config.workerType.name()
+ def streaming = config.streaming
+ def environment_config = config.workerType ==
CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ?
"--environment_config='{\"command\":
\"${project(":beam-sdks-python:").buildDir.absolutePath}/sdk_worker.sh\"}'" : ""
+ def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' :
'Batch'}${workerType}"
tasks.create(name: name) {
dependsOn 'setupVirtualenv'
- dependsOn ':beam-sdks-python-container:docker'
+ dependsOn 'createProcessWorker'
dependsOn ':beam-runners-flink_2.11-job-server:shadowJar'
+ if (workerType.toLowerCase() == 'docker')
+ dependsOn ':beam-sdks-python-container:docker'
doLast {
exec {
executable 'sh'
- args '-c', ". ${envdir}/bin/activate && pip install -e . && python -m
apache_beam.runners.portability.flink_runner_test
${project(":beam-runners-flink_2.11-job-server:").shadowJar.archivePath}
${type}"
+ args '-c', ". ${envdir}/bin/activate && pip install -e . && python -m
apache_beam.runners.portability.flink_runner_test
--flink_job_server_jar=${project(":beam-runners-flink_2.11-job-server:").shadowJar.archivePath}
--environment_type=${workerType} ${environment_config} ${streaming ?
'--streaming' : ''}"
}
}
}
}
-flinkCompatibilityMatrix('Batch')
-flinkCompatibilityMatrix('Streaming')
+task flinkCompatibilityMatrixDocker() {
+ dependsOn flinkCompatibilityMatrix(streaming: false)
+ dependsOn flinkCompatibilityMatrix(streaming: true)
+}
+
+task flinkCompatibilityMatrixProcess() {
+ dependsOn flinkCompatibilityMatrix(streaming: false, workerType:
CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
+ dependsOn flinkCompatibilityMatrix(streaming: true, workerType:
CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS)
+}
+
+task flinkValidatesRunner() {
+ dependsOn 'flinkCompatibilityMatrixProcess'
+}
task postCommit() {
dependsOn "preCommit"
@@ -379,3 +406,21 @@ task buildSnapshot() {
dependsOn 'sdist'
dependsOn 'depSnapshot'
}
+
+project.task('createProcessWorker') {
+ dependsOn ':beam-sdks-python-container:build'
+ dependsOn 'setupVirtualenv'
+ def sdkWorkerFile = file("${project.buildDir}/sdk_worker.sh")
+ def workerScript =
"${project(":beam-sdks-python-container:").buildDir.absolutePath}/target/launcher/linux_amd64/boot"
+ def sdkWorkerFileCode = "sh -c \". ${envdir}/bin/activate && ${workerScript}
\$* \""
+ outputs.file sdkWorkerFile
+ doLast {
+ sdkWorkerFile.write sdkWorkerFileCode
+ exec {
+ commandLine('sh', '-c', ". ${envdir}/bin/activate && cd
${project.projectDir} && python setup.py install ")
+ }
+ exec {
+ commandLine('chmod', '+x', sdkWorkerFile)
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 156206)
Time Spent: 11h 40m (was: 11.5h)
> Python Flink ValidatesRunner job fixes
> --------------------------------------
>
> Key: BEAM-5467
> URL: https://issues.apache.org/jira/browse/BEAM-5467
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Minor
> Labels: portability-flink
> Time Spent: 11h 40m
> Remaining Estimate: 0h
>
> Add status to README
> Rename script and job for consistency
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)