This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f81e16f3ea [#31403] Python wrapper to download, use, or build and run 
prism. (#31583)
2f81e16f3ea is described below

commit 2f81e16f3eadbfb77bb8191dcc7e6ef9a4887c6f
Author: Robert Burke <[email protected]>
AuthorDate: Fri Jun 28 15:10:50 2024 -0700

    [#31403] Python wrapper to download, use, or build and run prism. (#31583)
---
 .../python/apache_beam/options/pipeline_options.py |  19 ++
 .../runners/portability/prism_runner.py            | 216 +++++++++++++++++
 .../runners/portability/prism_runner_test.py       | 269 +++++++++++++++++++++
 3 files changed, 504 insertions(+)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index b204adc7fc5..6b1dd8bb48c 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -515,6 +515,7 @@ class StandardOptions(PipelineOptions):
       'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
       'apache_beam.runners.portability.flink_runner.FlinkRunner',
       'apache_beam.runners.portability.portable_runner.PortableRunner',
+      'apache_beam.runners.portability.prism_runner.PrismRunner',
       'apache_beam.runners.portability.spark_runner.SparkRunner',
       'apache_beam.runners.test.TestDirectRunner',
       'apache_beam.runners.test.TestDataflowRunner',
@@ -1707,6 +1708,24 @@ class SparkRunnerOptions(PipelineOptions):
         help='Spark major version to use.')
 
 
+class PrismRunnerOptions(PipelineOptions):
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--prism_location',
+        help='Path or URL to a prism binary, or zipped binary for the current '
+        'platform (Operating System and Architecture). May also be an Apache '
+        'Beam Github Release page URL, with a matching beam_version_override '
+        'set. This option overrides all others for finding a prism binary.')
+    parser.add_argument(
+        '--prism_beam_version_override',
+        help=
+        'Override the SDK\'s version for deriving the Github Release URLs for '
+        'downloading a zipped prism binary, for the current platform. If '
+        'prism_location is set to a Github Release page URL, them it will use '
+        'that release page as a base when constructing the download URL.')
+
+
 class TestOptions(PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
new file mode 100644
index 00000000000..eeccaf5748c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner for executing portable pipelines on Apache Beam Prism."""
+
+# this will make using the list parameterized generic happy
+# on python 3.8 so we aren't revisiting this code after we
+# sunset it
+from __future__ import annotations
+
+import logging
+import os
+import platform
+import shutil
+import stat
+import typing
+import urllib
+import zipfile
+from urllib.error import URLError
+from urllib.request import urlopen
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.options import pipeline_options
+from apache_beam.runners.portability import job_server
+from apache_beam.runners.portability import portable_runner
+from apache_beam.transforms import environments
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+# pytype: skip-file
+
+# Prefix for constructing a download URL
+GITHUB_DOWNLOAD_PREFIX = 'https://github.com/apache/beam/releases/download/'
+# Prefix for constructing a release URL, so we can derive a download URL
+GITHUB_TAG_PREFIX = 'https://github.com/apache/beam/releases/tag/'
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class PrismRunner(portable_runner.PortableRunner):
+  """A runner for launching jobs on Prism, automatically downloading and
+  starting a Prism instance if needed.
+  """
+  def default_environment(
+      self,
+      options: pipeline_options.PipelineOptions) -> environments.Environment:
+    portable_options = options.view_as(pipeline_options.PortableOptions)
+    if (not portable_options.environment_type and
+        not portable_options.output_executable_path):
+      portable_options.environment_type = 'LOOPBACK'
+    return super().default_environment(options)
+
+  def default_job_server(self, options):
+    return job_server.StopOnExitJobServer(PrismJobServer(options))
+
+  def create_job_service_handle(self, job_service, options):
+    return portable_runner.JobServiceHandle(
+        job_service, options, retain_unknown_options=True)
+
+
+class PrismJobServer(job_server.SubprocessJobServer):
+  PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism")
+  BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
+
+  def __init__(self, options):
+    super().__init__()
+    prism_options = options.view_as(pipeline_options.PrismRunnerOptions)
+    # Options flow:
+    # If the path is set, always download and unzip the provided path,
+    # even if a binary is cached.
+    self._path = prism_options.prism_location
+    # Which version to use when constructing the prism download url.
+    if prism_options.prism_beam_version_override:
+      self._version = prism_options.prism_beam_version_override
+    else:
+      self._version = 'v' + beam_version
+
+    job_options = options.view_as(pipeline_options.JobServerOptions)
+    self._job_port = job_options.job_port
+
+  @classmethod
+  def maybe_unzip_and_make_executable(cls, url: str, bin_cache: str) -> str:
+    if zipfile.is_zipfile(url):
+      z = zipfile.ZipFile(url)
+      url = z.extract(
+          os.path.splitext(os.path.basename(url))[0], path=bin_cache)
+
+    # Make sure the binary is executable.
+    st = os.stat(url)
+    os.chmod(url, st.st_mode | stat.S_IEXEC)
+    return url
+
+  # Finds the bin or zip in the local cache, and if not, fetches it.
+  @classmethod
+  def local_bin(
+      cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+    # ignore_cache sets whether we should always be downloading and unzipping
+    # the file or not, to avoid staleness issues.
+    if bin_cache == '':
+      bin_cache = cls.BIN_CACHE
+    if os.path.exists(url):
+      _LOGGER.info('Using local prism binary from %s' % url)
+      return cls.maybe_unzip_and_make_executable(url, bin_cache=bin_cache)
+    else:
+      cached_bin = os.path.join(bin_cache, os.path.basename(url))
+      if os.path.exists(cached_bin) and not ignore_cache:
+        _LOGGER.info('Using cached prism binary from %s' % url)
+      else:
+        _LOGGER.info('Downloading prism binary from %s' % url)
+        if not os.path.exists(bin_cache):
+          os.makedirs(bin_cache)
+        try:
+          try:
+            url_read = FileSystems.open(url)
+          except ValueError:
+            url_read = urlopen(url)
+          with open(cached_bin + '.tmp', 'wb') as zip_write:
+            shutil.copyfileobj(url_read, zip_write, length=1 << 20)
+          os.rename(cached_bin + '.tmp', cached_bin)
+        except URLError as e:
+          raise RuntimeError(
+              'Unable to fetch remote prism binary at %s: %s' % (url, e))
+      return cls.maybe_unzip_and_make_executable(
+          cached_bin, bin_cache=bin_cache)
+
+  def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+    """Construct the prism download URL with the appropriate release tag.
+    This maps operating systems and machine architectures to the compatible
+    and canonical names used by the Go build targets.
+
+    platform.system() provides compatible listings, so we need to filter out
+    the unsupported versions."""
+    opsys = sys.lower()
+    if opsys not in ['linux', 'windows', 'darwin']:
+      raise ValueError(
+          'Operating System "%s" unsupported for constructing a Prism release '
+          'binary URL.' % (opsys))
+
+    # platform.machine() will vary by system, but many names are compatible.
+    arch = mach.lower()
+    if arch in ['amd64', 'x86_64', 'x86-64', 'x64']:
+      arch = 'amd64'
+    if arch in ['arm64', 'aarch64_be', 'aarch64', 'armv8b', 'armv8l']:
+      arch = 'arm64'
+
+    if arch not in ['amd64', 'arm64']:
+      raise ValueError(
+          'Machine archictecture "%s" unsupported for constructing a Prism '
+          'release binary URL.' % (opsys))
+    return (
+        GITHUB_DOWNLOAD_PREFIX +
+        f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip")
+
+  def path_to_binary(self) -> str:
+    if self._path is not None:
+      if not os.path.exists(self._path):
+        url = urllib.parse.urlparse(self._path)
+        if not url.scheme:
+          raise ValueError(
+              'Unable to parse binary URL "%s". If using a full URL, make '
+              'sure the scheme is specified. If using a local file xpath, '
+              'make sure the file exists; you may have to first build prism '
+              'using `go build `.' % (self._path))
+
+        # We have a URL, see if we need to construct a valid file name.
+        if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
+          # If this URL starts with the download prefix, let it through.
+          return self._path
+        # The only other valid option is a github release page.
+        if not self._path.startswith(GITHUB_TAG_PREFIX):
+          raise ValueError(
+              'Provided --prism_location URL is not an Apache Beam Github '
+              'Release page URL or download URL: %s' % (self._path))
+        # Get the root tag for this URL
+        root_tag = os.path.basename(os.path.normpath(self._path))
+        return self.construct_download_url(
+            root_tag, platform.system(), platform.machine())
+      return self._path
+    else:
+      if '.dev' in self._version:
+        raise ValueError(
+            'Unable to derive URL for dev versions "%s". Please provide an '
+            'alternate version to derive the release URL with the '
+            '--prism_beam_version_override flag.' % (self._version))
+      return self.construct_download_url(
+          self._version, platform.system(), platform.machine())
+
+  def subprocess_cmd_and_endpoint(
+      self) -> typing.Tuple[typing.List[typing.Any], str]:
+    bin_path = self.local_bin(
+        self.path_to_binary(), ignore_cache=(self._path is not None))
+    job_port, = subprocess_server.pick_port(self._job_port)
+    subprocess_cmd = [bin_path] + self.prism_arguments(job_port)
+    return (subprocess_cmd, f"localhost:{job_port}")
+
+  def prism_arguments(self, job_port) -> typing.List[typing.Any]:
+    return [
+        '--job_port',
+        job_port,
+        '--serve_http',
+        False,
+    ]
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py 
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
new file mode 100644
index 00000000000..f1ccf66a228
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import argparse
+import logging
+import shlex
+import typing
+import unittest
+from os import linesep
+from os import path
+from os.path import exists
+from shutil import rmtree
+from tempfile import mkdtemp
+
+import pytest
+
+import apache_beam as beam
+from apache_beam import Impulse
+from apache_beam import Map
+from apache_beam.io.external.generate_sequence import GenerateSequence
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.runners.portability import portable_runner_test
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.sql import SqlTransform
+
+# Run as
+#
+# pytest prism_runner_test.py[::TestClass::test_case] \
+#     --test-pipeline-options="--environment_type=LOOPBACK"
+
+_LOGGER = logging.getLogger(__name__)
+
+Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)])
+beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
+
+
+class PrismRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  conf_dir = None
+  expansion_port = None
+
+  def __init__(self, *args, **kwargs):
+    super().__init__(*args, **kwargs)
+    self.environment_type = None
+    self.environment_config = None
+    self.enable_commit = False
+
+  def setUp(self):
+    self.enable_commit = False
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    if not request.config.option.test_pipeline_options:
+      raise unittest.SkipTest(
+          'Skipping because --test-pipeline-options is not specified.')
+    test_pipeline_options = request.config.option.test_pipeline_options
+    parser = argparse.ArgumentParser(add_help=True)
+    parser.add_argument(
+        '--prism_bin', help='Prism binary to submit jobs.', action='store')
+    parser.add_argument(
+        '--environment_type',
+        default='LOOPBACK',
+        choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+        help='Set the environment type for running user code. DOCKER runs '
+        'user code in a container. PROCESS runs user code in '
+        'automatically started processes. LOOPBACK runs user code on '
+        'the same process that originally submitted the job.')
+    parser.add_argument(
+        '--environment_option',
+        '--environment_options',
+        dest='environment_options',
+        action='append',
+        default=None,
+        help=(
+            'Environment configuration for running the user code. '
+            'Recognized options depend on --environment_type.\n '
+            'For DOCKER: docker_container_image (optional)\n '
+            'For PROCESS: process_command (required), process_variables '
+            '(optional, comma-separated)\n '
+            'For EXTERNAL: external_service_address (required)'))
+    known_args, unknown_args = parser.parse_known_args(
+        shlex.split(test_pipeline_options))
+    if unknown_args:
+      _LOGGER.warning('Discarding unrecognized arguments %s' % unknown_args)
+    self.set_prism_bin(known_args.prism_bin)
+    self.environment_type = known_args.environment_type
+    self.environment_options = known_args.environment_options
+
+  @classmethod
+  def tearDownClass(cls):
+    if cls.conf_dir and exists(cls.conf_dir):
+      _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
+      rmtree(cls.conf_dir)
+    super().tearDownClass()
+
+  @classmethod
+  def _create_conf_dir(cls):
+    """Create (and save a static reference to) a "conf dir", used to provide
+     metrics configs and verify metrics output
+
+     It gets cleaned up when the suite is done executing"""
+
+    if hasattr(cls, 'conf_dir'):
+      cls.conf_dir = mkdtemp(prefix='prismtest-conf')
+
+      # path for a FileReporter to write metrics to
+      cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+      # path to write Prism configuration to
+      conf_path = path.join(cls.conf_dir, 'prism-conf.yaml')
+      file_reporter = 'org.apache.beam.runners.prism.metrics.FileReporter'
+      with open(conf_path, 'w') as f:
+        f.write(
+            linesep.join([
+                'metrics.reporters: file',
+                'metrics.reporter.file.class: %s' % file_reporter,
+                'metrics.reporter.file.path: %s' % cls.test_metrics_path,
+                'metrics.scope.operator: <operator_name>',
+            ]))
+
+  @classmethod
+  def _subprocess_command(cls, job_port, expansion_port):
+    # will be cleaned up at the end of this method, and recreated and used by
+    # the job server
+    tmp_dir = mkdtemp(prefix='prismtest')
+
+    cls._create_conf_dir()
+    cls.expansion_port = expansion_port
+
+    try:
+      return [
+          cls.prism_bin,
+          '--job_port',
+          str(job_port),
+      ]
+    finally:
+      rmtree(tmp_dir)
+
+  @classmethod
+  def get_expansion_service(cls):
+    # TODO Move expansion address resides into PipelineOptions
+    return 'localhost:%s' % cls.expansion_port
+
+  @classmethod
+  def set_prism_bin(cls, prism_bin):
+    cls.prism_bin = prism_bin
+
+  def create_options(self):
+    options = super().create_options()
+    options.view_as(DebugOptions).experiments = ['beam_fn_api']
+    options.view_as(DebugOptions).experiments = [
+        'pre_optimize=default'
+    ] + options.view_as(DebugOptions).experiments
+    options.view_as(PortableOptions).environment_type = self.environment_type
+    options.view_as(
+        PortableOptions).environment_options = self.environment_options
+
+    return options
+
+  # Can't read host files from within docker, read a "local" file there.
+  def test_read(self):
+    print('name:', __name__)
+    with self.create_pipeline() as p:
+      lines = p | beam.io.ReadFromText('/etc/profile')
+      assert_that(lines, lambda lines: len(lines) > 0)
+
+  def test_external_transform(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | GenerateSequence(
+              start=1, stop=10, 
expansion_service=self.get_expansion_service()))
+
+      assert_that(res, equal_to([i for i in range(1, 10)]))
+
+  def test_expand_kafka_read(self):
+    # We expect to fail here because we do not have a Kafka cluster handy.
+    # Nevertheless, we check that the transform is expanded by the
+    # ExpansionService and that the pipeline fails during execution.
+    with self.assertRaises(Exception) as ctx:
+      self.enable_commit = True
+      with self.create_pipeline() as p:
+        # pylint: disable=expression-not-assigned
+        (
+            p
+            | ReadFromKafka(
+                consumer_config={
+                    'bootstrap.servers': 'notvalid1:7777, notvalid2:3531',
+                    'group.id': 'any_group'
+                },
+                topics=['topic1', 'topic2'],
+                key_deserializer='org.apache.kafka.'
+                'common.serialization.'
+                'ByteArrayDeserializer',
+                value_deserializer='org.apache.kafka.'
+                'common.serialization.'
+                'LongDeserializer',
+                commit_offset_in_finalize=True,
+                timestamp_policy=ReadFromKafka.create_time_policy,
+                expansion_service=self.get_expansion_service()))
+    self.assertTrue(
+        'No resolvable bootstrap urls given in bootstrap.servers' in str(
+            ctx.exception),
+        'Expected to fail due to invalid bootstrap.servers, but '
+        'failed due to:\n%s' % str(ctx.exception))
+
+  def test_expand_kafka_write(self):
+    # We just test the expansion but do not execute.
+    # pylint: disable=expression-not-assigned
+    (
+        self.create_pipeline()
+        | Impulse()
+        | Map(lambda input: (1, input))
+        | WriteToKafka(
+            producer_config={
+                'bootstrap.servers': 'localhost:9092, notvalid2:3531'
+            },
+            topic='topic1',
+            key_serializer='org.apache.kafka.'
+            'common.serialization.'
+            'LongSerializer',
+            value_serializer='org.apache.kafka.'
+            'common.serialization.'
+            'ByteArraySerializer',
+            expansion_service=self.get_expansion_service()))
+
+  def test_sql(self):
+    with self.create_pipeline() as p:
+      output = (
+          p
+          | 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)])
+          | 'Sql' >> SqlTransform(
+              """SELECT col1, col2 || '*' || col2 as col2,
+                    power(col1, 2) as col3
+             FROM PCOLLECTION
+          """,
+              expansion_service=self.get_expansion_service()))
+      assert_that(
+          output,
+          equal_to([(x, '{x}*{x}'.format(x=x), x * x) for x in range(5)]))
+
+
+# Inherits all other tests.
+
+if __name__ == '__main__':
+  # Run the tests.
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

Reply via email to