This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2f81e16f3ea [#31403] Python wrapper to download, use, or build and run
prism. (#31583)
2f81e16f3ea is described below
commit 2f81e16f3eadbfb77bb8191dcc7e6ef9a4887c6f
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jun 28 15:10:50 2024 -0700
[#31403] Python wrapper to download, use, or build and run prism. (#31583)
---
.../python/apache_beam/options/pipeline_options.py | 19 ++
.../runners/portability/prism_runner.py | 216 +++++++++++++++++
.../runners/portability/prism_runner_test.py | 269 +++++++++++++++++++++
3 files changed, 504 insertions(+)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index b204adc7fc5..6b1dd8bb48c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -515,6 +515,7 @@ class StandardOptions(PipelineOptions):
'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
'apache_beam.runners.portability.flink_runner.FlinkRunner',
'apache_beam.runners.portability.portable_runner.PortableRunner',
+ 'apache_beam.runners.portability.prism_runner.PrismRunner',
'apache_beam.runners.portability.spark_runner.SparkRunner',
'apache_beam.runners.test.TestDirectRunner',
'apache_beam.runners.test.TestDataflowRunner',
@@ -1707,6 +1708,24 @@ class SparkRunnerOptions(PipelineOptions):
help='Spark major version to use.')
+class PrismRunnerOptions(PipelineOptions):
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--prism_location',
+ help='Path or URL to a prism binary, or zipped binary for the current '
+ 'platform (Operating System and Architecture). May also be an Apache '
+ 'Beam Github Release page URL, with a matching beam_version_override '
+ 'set. This option overrides all others for finding a prism binary.')
+ parser.add_argument(
+ '--prism_beam_version_override',
+ help=
+ 'Override the SDK\'s version for deriving the Github Release URLs for '
+ 'downloading a zipped prism binary, for the current platform. If '
+ 'prism_location is set to a Github Release page URL, them it will use '
+ 'that release page as a base when constructing the download URL.')
+
+
class TestOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
new file mode 100644
index 00000000000..eeccaf5748c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner for executing portable pipelines on Apache Beam Prism."""
+
+# this will make using the list parameterized generic happy
+# on python 3.8 so we aren't revisiting this code after we
+# sunset it
+from __future__ import annotations
+
+import logging
+import os
+import platform
+import shutil
+import stat
+import typing
+import urllib
+import zipfile
+from urllib.error import URLError
+from urllib.request import urlopen
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options import pipeline_options
+from apache_beam.runners.portability import job_server
+from apache_beam.runners.portability import portable_runner
+from apache_beam.transforms import environments
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+# pytype: skip-file
+
+# Prefix for constructing a download URL
+GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/'
+# Prefix for constructing a release URL, so we can derive a download URL
+GITHUB_TAG_PREFIX = 'https://github.com/apache/beam/releases/tag/'
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class PrismRunner(portable_runner.PortableRunner):
+ """A runner for launching jobs on Prism, automatically downloading and
+ starting a Prism instance if needed.
+ """
+ def default_environment(
+ self,
+ options: pipeline_options.PipelineOptions) -> environments.Environment:
+ portable_options = options.view_as(pipeline_options.PortableOptions)
+ if (not portable_options.environment_type and
+ not portable_options.output_executable_path):
+ portable_options.environment_type = 'LOOPBACK'
+ return super().default_environment(options)
+
+ def default_job_server(self, options):
+ return job_server.StopOnExitJobServer(PrismJobServer(options))
+
+ def create_job_service_handle(self, job_service, options):
+ return portable_runner.JobServiceHandle(
+ job_service, options, retain_unknown_options=True)
+
+
+class PrismJobServer(job_server.SubprocessJobServer):
+ PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism")
+ BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
+
+ def __init__(self, options):
+ super().__init__()
+ prism_options = options.view_as(pipeline_options.PrismRunnerOptions)
+ # Options flow:
+ # If the path is set, always download and unzip the provided path,
+ # even if a binary is cached.
+ self._path = prism_options.prism_location
+ # Which version to use when constructing the prism download url.
+ if prism_options.prism_beam_version_override:
+ self._version = prism_options.prism_beam_version_override
+ else:
+ self._version = 'v' + beam_version
+
+ job_options = options.view_as(pipeline_options.JobServerOptions)
+ self._job_port = job_options.job_port
+
+ @classmethod
+ def maybe_unzip_and_make_executable(cls, url: str, bin_cache: str) -> str:
+ if zipfile.is_zipfile(url):
+ z = zipfile.ZipFile(url)
+ url = z.extract(
+ os.path.splitext(os.path.basename(url))[0], path=bin_cache)
+
+ # Make sure the binary is executable.
+ st = os.stat(url)
+ os.chmod(url, st.st_mode | stat.S_IEXEC)
+ return url
+
+ # Finds the bin or zip in the local cache, and if not, fetches it.
+ @classmethod
+ def local_bin(
+ cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+ # ignore_cache sets whether we should always be downloading and unzipping
+ # the file or not, to avoid staleness issues.
+ if bin_cache == '':
+ bin_cache = cls.BIN_CACHE
+ if os.path.exists(url):
+ _LOGGER.info('Using local prism binary from %s' % url)
+ return cls.maybe_unzip_and_make_executable(url, bin_cache=bin_cache)
+ else:
+ cached_bin = os.path.join(bin_cache, os.path.basename(url))
+ if os.path.exists(cached_bin) and not ignore_cache:
+ _LOGGER.info('Using cached prism binary from %s' % url)
+ else:
+ _LOGGER.info('Downloading prism binary from %s' % url)
+ if not os.path.exists(bin_cache):
+ os.makedirs(bin_cache)
+ try:
+ try:
+ url_read = FileSystems.open(url)
+ except ValueError:
+ url_read = urlopen(url)
+ with open(cached_bin + '.tmp', 'wb') as zip_write:
+ shutil.copyfileobj(url_read, zip_write, length=1 << 20)
+ os.rename(cached_bin + '.tmp', cached_bin)
+ except URLError as e:
+ raise RuntimeError(
+ 'Unable to fetch remote prism binary at %s: %s' % (url, e))
+ return cls.maybe_unzip_and_make_executable(
+ cached_bin, bin_cache=bin_cache)
+
+ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+ """Construct the prism download URL with the appropriate release tag.
+ This maps operating systems and machine architectures to the compatible
+ and canonical names used by the Go build targets.
+
+ platform.system() provides compatible listings, so we need to filter out
+ the unsupported versions."""
+ opsys = sys.lower()
+ if opsys not in ['linux', 'windows', 'darwin']:
+ raise ValueError(
+ 'Operating System "%s" unsupported for constructing a Prism release '
+ 'binary URL.' % (opsys))
+
+ # platform.machine() will vary by system, but many names are compatible.
+ arch = mach.lower()
+ if arch in ['amd64', 'x86_64', 'x86-64', 'x64']:
+ arch = 'amd64'
+ if arch in ['arm64', 'aarch64_be', 'aarch64', 'armv8b', 'armv8l']:
+ arch = 'arm64'
+
+ if arch not in ['amd64', 'arm64']:
+ raise ValueError(
+ 'Machine archictecture "%s" unsupported for constructing a Prism '
+ 'release binary URL.' % (opsys))
+ return (
+ GITHUB_DOWNLOAD_PREFIX +
+ f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip")
+
+ def path_to_binary(self) -> str:
+ if self._path is not None:
+ if not os.path.exists(self._path):
+ url = urllib.parse.urlparse(self._path)
+ if not url.scheme:
+ raise ValueError(
+ 'Unable to parse binary URL "%s". If using a full URL, make '
+ 'sure the scheme is specified. If using a local file xpath, '
+ 'make sure the file exists; you may have to first build prism '
+ 'using `go build `.' % (self._path))
+
+ # We have a URL, see if we need to construct a valid file name.
+ if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
+ # If this URL starts with the download prefix, let it through.
+ return self._path
+ # The only other valid option is a github release page.
+ if not self._path.startswith(GITHUB_TAG_PREFIX):
+ raise ValueError(
+ 'Provided --prism_location URL is not an Apache Beam Github '
+ 'Release page URL or download URL: %s' % (self._path))
+ # Get the root tag for this URL
+ root_tag = os.path.basename(os.path.normpath(self._path))
+ return self.construct_download_url(
+ root_tag, platform.system(), platform.machine())
+ return self._path
+ else:
+ if '.dev' in self._version:
+ raise ValueError(
+ 'Unable to derive URL for dev versions "%s". Please provide an '
+ 'alternate version to derive the release URL with the '
+ '--prism_beam_version_override flag.' % (self._version))
+ return self.construct_download_url(
+ self._version, platform.system(), platform.machine())
+
+ def subprocess_cmd_and_endpoint(
+ self) -> typing.Tuple[typing.List[typing.Any], str]:
+ bin_path = self.local_bin(
+ self.path_to_binary(), ignore_cache=(self._path is not None))
+ job_port, = subprocess_server.pick_port(self._job_port)
+ subprocess_cmd = [bin_path] + self.prism_arguments(job_port)
+ return (subprocess_cmd, f"localhost:{job_port}")
+
+ def prism_arguments(self, job_port) -> typing.List[typing.Any]:
+ return [
+ '--job_port',
+ job_port,
+ '--serve_http',
+ False,
+ ]
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
new file mode 100644
index 00000000000..f1ccf66a228
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import argparse
+import logging
+import shlex
+import typing
+import unittest
+from os import linesep
+from os import path
+from os.path import exists
+from shutil import rmtree
+from tempfile import mkdtemp
+
+import pytest
+
+import apache_beam as beam
+from apache_beam import Impulse
+from apache_beam import Map
+from apache_beam.io.external.generate_sequence import GenerateSequence
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.runners.portability import portable_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.sql import SqlTransform
+
+# Run as
+#
+# pytest prism_runner_test.py[::TestClass::test_case] \
+# --test-pipeline-options="--environment_type=LOOPBACK"
+
+_LOGGER = logging.getLogger(__name__)
+
+Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)])
+beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
+
+
+class PrismRunnerTest(portable_runner_test.PortableRunnerTest):
+ _use_grpc = True
+ _use_subprocesses = True
+
+ conf_dir = None
+ expansion_port = None
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.environment_type = None
+ self.environment_config = None
+ self.enable_commit = False
+
+ def setUp(self):
+ self.enable_commit = False
+
+ @pytest.fixture(autouse=True)
+ def parse_options(self, request):
+ if not request.config.option.test_pipeline_options:
+ raise unittest.SkipTest(
+ 'Skipping because --test-pipeline-options is not specified.')
+ test_pipeline_options = request.config.option.test_pipeline_options
+ parser = argparse.ArgumentParser(add_help=True)
+ parser.add_argument(
+ '--prism_bin', help='Prism binary to submit jobs.', action='store')
+ parser.add_argument(
+ '--environment_type',
+ default='LOOPBACK',
+ choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+ help='Set the environment type for running user code. DOCKER runs '
+ 'user code in a container. PROCESS runs user code in '
+ 'automatically started processes. LOOPBACK runs user code on '
+ 'the same process that originally submitted the job.')
+ parser.add_argument(
+ '--environment_option',
+ '--environment_options',
+ dest='environment_options',
+ action='append',
+ default=None,
+ help=(
+ 'Environment configuration for running the user code. '
+ 'Recognized options depend on --environment_type.\n '
+ 'For DOCKER: docker_container_image (optional)\n '
+ 'For PROCESS: process_command (required), process_variables '
+ '(optional, comma-separated)\n '
+ 'For EXTERNAL: external_service_address (required)'))
+ known_args, unknown_args = parser.parse_known_args(
+ shlex.split(test_pipeline_options))
+ if unknown_args:
+ _LOGGER.warning('Discarding unrecognized arguments %s' % unknown_args)
+ self.set_prism_bin(known_args.prism_bin)
+ self.environment_type = known_args.environment_type
+ self.environment_options = known_args.environment_options
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.conf_dir and exists(cls.conf_dir):
+ _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
+ rmtree(cls.conf_dir)
+ super().tearDownClass()
+
+ @classmethod
+ def _create_conf_dir(cls):
+ """Create (and save a static reference to) a "conf dir", used to provide
+ metrics configs and verify metrics output
+
+ It gets cleaned up when the suite is done executing"""
+
+ if hasattr(cls, 'conf_dir'):
+ cls.conf_dir = mkdtemp(prefix='prismtest-conf')
+
+ # path for a FileReporter to write metrics to
+ cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+ # path to write Prism configuration to
+ conf_path = path.join(cls.conf_dir, 'prism-conf.yaml')
+ file_reporter = 'org.apache.beam.runners.prism.metrics.FileReporter'
+ with open(conf_path, 'w') as f:
+ f.write(
+ linesep.join([
+ 'metrics.reporters: file',
+ 'metrics.reporter.file.class: %s' % file_reporter,
+ 'metrics.reporter.file.path: %s' % cls.test_metrics_path,
+ 'metrics.scope.operator: <operator_name>',
+ ]))
+
+ @classmethod
+ def _subprocess_command(cls, job_port, expansion_port):
+ # will be cleaned up at the end of this method, and recreated and used by
+ # the job server
+ tmp_dir = mkdtemp(prefix='prismtest')
+
+ cls._create_conf_dir()
+ cls.expansion_port = expansion_port
+
+ try:
+ return [
+ cls.prism_bin,
+ '--job_port',
+ str(job_port),
+ ]
+ finally:
+ rmtree(tmp_dir)
+
+ @classmethod
+ def get_expansion_service(cls):
+ # TODO Move expansion address resides into PipelineOptions
+ return 'localhost:%s' % cls.expansion_port
+
+ @classmethod
+ def set_prism_bin(cls, prism_bin):
+ cls.prism_bin = prism_bin
+
+ def create_options(self):
+ options = super().create_options()
+ options.view_as(DebugOptions).experiments = ['beam_fn_api']
+ options.view_as(DebugOptions).experiments = [
+ 'pre_optimize=default'
+ ] + options.view_as(DebugOptions).experiments
+ options.view_as(PortableOptions).environment_type = self.environment_type
+ options.view_as(
+ PortableOptions).environment_options = self.environment_options
+
+ return options
+
+ # Can't read host files from within docker, read a "local" file there.
+ def test_read(self):
+ print('name:', __name__)
+ with self.create_pipeline() as p:
+ lines = p | beam.io.ReadFromText('/etc/profile')
+ assert_that(lines, lambda lines: len(lines) > 0)
+
+ def test_external_transform(self):
+ with self.create_pipeline() as p:
+ res = (
+ p
+ | GenerateSequence(
+ start=1, stop=10,
expansion_service=self.get_expansion_service()))
+
+ assert_that(res, equal_to([i for i in range(1, 10)]))
+
+ def test_expand_kafka_read(self):
+ # We expect to fail here because we do not have a Kafka cluster handy.
+ # Nevertheless, we check that the transform is expanded by the
+ # ExpansionService and that the pipeline fails during execution.
+ with self.assertRaises(Exception) as ctx:
+ self.enable_commit = True
+ with self.create_pipeline() as p:
+ # pylint: disable=expression-not-assigned
+ (
+ p
+ | ReadFromKafka(
+ consumer_config={
+ 'bootstrap.servers': 'notvalid1:7777, notvalid2:3531',
+ 'group.id': 'any_group'
+ },
+ topics=['topic1', 'topic2'],
+ key_deserializer='org.apache.kafka.'
+ 'common.serialization.'
+ 'ByteArrayDeserializer',
+ value_deserializer='org.apache.kafka.'
+ 'common.serialization.'
+ 'LongDeserializer',
+ commit_offset_in_finalize=True,
+ timestamp_policy=ReadFromKafka.create_time_policy,
+ expansion_service=self.get_expansion_service()))
+ self.assertTrue(
+ 'No resolvable bootstrap urls given in bootstrap.servers' in str(
+ ctx.exception),
+ 'Expected to fail due to invalid bootstrap.servers, but '
+ 'failed due to:\n%s' % str(ctx.exception))
+
+ def test_expand_kafka_write(self):
+ # We just test the expansion but do not execute.
+ # pylint: disable=expression-not-assigned
+ (
+ self.create_pipeline()
+ | Impulse()
+ | Map(lambda input: (1, input))
+ | WriteToKafka(
+ producer_config={
+ 'bootstrap.servers': 'localhost:9092, notvalid2:3531'
+ },
+ topic='topic1',
+ key_serializer='org.apache.kafka.'
+ 'common.serialization.'
+ 'LongSerializer',
+ value_serializer='org.apache.kafka.'
+ 'common.serialization.'
+ 'ByteArraySerializer',
+ expansion_service=self.get_expansion_service()))
+
+ def test_sql(self):
+ with self.create_pipeline() as p:
+ output = (
+ p
+ | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)])
+ | 'Sql' >> SqlTransform(
+ """SELECT col1, col2 || '*' || col2 as col2,
+ power(col1, 2) as col3
+ FROM PCOLLECTION
+ """,
+ expansion_service=self.get_expansion_service()))
+ assert_that(
+ output,
+ equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)]))
+
+
+# Inherits all other tests.
+
+if __name__ == '__main__':
+ # Run the tests.
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()