This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dcd08a1 [BEAM-12439] Reuse Java job servers in spark_runner.py.
new f07d2a2 Merge pull request #14941 from ibzib/BEAM-12439
dcd08a1 is described below
commit dcd08a1d0f5606ad492e3e1f5425a81d706c9570
Author: Kyle Weaver <[email protected]>
AuthorDate: Thu Jun 3 15:36:18 2021 -0700
[BEAM-12439] Reuse Java job servers in spark_runner.py.
---
.../portability/spark_java_job_server_test.py | 65 ++++++++++++++++++++++
.../runners/portability/spark_runner.py | 14 ++++-
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py
b/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py
new file mode 100644
index 0000000..50490d9
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/spark_java_job_server_test.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import logging
+import unittest
+
+from apache_beam.options import pipeline_options
+from apache_beam.runners.portability.spark_runner import SparkRunner
+
+
+class SparkTestPipelineOptions(pipeline_options.PipelineOptions):
+ def view_as(self, cls):
+ # Ensure only SparkRunnerOptions and JobServerOptions are used when calling
+ # default_job_server. If other options classes are needed, the cache key
+ # must include them to prevent incorrect hits.
+ assert (
+ cls is pipeline_options.SparkRunnerOptions or
+ cls is pipeline_options.JobServerOptions)
+ return super().view_as(cls)
+
+
+class SparkJavaJobServerTest(unittest.TestCase):
+ def test_job_server_cache(self):
+ # Multiple SparkRunner instances may be created, so we need to make sure we
+ # cache job servers across runner instances.
+
+ # Most pipeline-specific options, such as sdk_worker_parallelism, don't
+ # affect job server configuration, so it is ok to ignore them for caching.
+ job_server1 = SparkRunner().default_job_server(
+ SparkTestPipelineOptions(['--sdk_worker_parallelism=1']))
+ job_server2 = SparkRunner().default_job_server(
+ SparkTestPipelineOptions(['--sdk_worker_parallelism=2']))
+ self.assertIs(job_server2, job_server1)
+
+ # JobServerOptions and SparkRunnerOptions do affect job server
+ # configuration, so using different pipeline options gives us a different
+ # job server.
+ job_server3 = SparkRunner().default_job_server(
+ SparkTestPipelineOptions(['--job_port=1234']))
+ self.assertIsNot(job_server3, job_server1)
+
+ job_server4 = SparkRunner().default_job_server(
+
SparkTestPipelineOptions(['--spark_master_url=spark://localhost:5678']))
+ self.assertIsNot(job_server4, job_server1)
+ self.assertIsNot(job_server4, job_server3)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/runners/portability/spark_runner.py
b/sdks/python/apache_beam/runners/portability/spark_runner.py
index fb5608c..1145507 100644
--- a/sdks/python/apache_beam/runners/portability/spark_runner.py
+++ b/sdks/python/apache_beam/runners/portability/spark_runner.py
@@ -31,6 +31,10 @@ from apache_beam.runners.portability import
spark_uber_jar_job_server
# https://spark.apache.org/docs/latest/submitting-applications.html#master-urls
LOCAL_MASTER_PATTERN = r'^local(\[.+\])?$'
+# Since Java job servers are heavyweight external processes, cache them.
+# This applies only to SparkJarJobServer, not SparkUberJarJobServer.
+JOB_SERVER_CACHE = {}
+
class SparkRunner(portable_runner.PortableRunner):
def run_pipeline(self, pipeline, options):
@@ -49,7 +53,15 @@ class SparkRunner(portable_runner.PortableRunner):
raise ValueError('Option spark_rest_url must be set.')
return spark_uber_jar_job_server.SparkUberJarJobServer(
spark_options.spark_rest_url, options)
- return job_server.StopOnExitJobServer(SparkJarJobServer(options))
+ # Use Java job server by default.
+ # Only SparkRunnerOptions and JobServerOptions affect job server
+ # configuration, so concat those as the cache key.
+ job_server_options = options.view_as(pipeline_options.JobServerOptions)
+ options_str = str(spark_options) + str(job_server_options)
+ if not options_str in JOB_SERVER_CACHE:
+ JOB_SERVER_CACHE[options_str] = job_server.StopOnExitJobServer(
+ SparkJarJobServer(options))
+ return JOB_SERVER_CACHE[options_str]
def create_job_service_handle(self, job_service, options):
return portable_runner.JobServiceHandle(