This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2784cde488f Refactor PrismJobServer python class (#35516)
2784cde488f is described below
commit 2784cde488f2308f0553f8c9ec3337df3ed04b96
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jul 7 09:46:11 2025 -0400
Refactor PrismJobServer python class (#35516)
* Refactor PrismRunner class in Python. Fix typos and add comments.
* Add some handling for release candidate versions.
* Fix lints.
---
.../runners/portability/prism_runner.py | 202 +++++++++++++++------
1 file changed, 144 insertions(+), 58 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 654ad8da826..14854ca14a3 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -112,7 +112,6 @@ def _rename_if_different(src, dst):
class PrismJobServer(job_server.SubprocessJobServer):
- PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism")
BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
def __init__(self, options):
@@ -131,9 +130,32 @@ class PrismJobServer(job_server.SubprocessJobServer):
job_options = options.view_as(pipeline_options.JobServerOptions)
self._job_port = job_options.job_port
+ # the method is only kept for testing and backward compatibility
@classmethod
- def maybe_unzip_and_make_executable(
- cls, url: str, bin_cache: str, ignore_cache: bool = True) -> str:
+ def local_bin(
+ cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+ url, ignore_cache = cls._download_to_local_path(url,
+ bin_cache,
+ ignore_cache)
+ return cls._prepare_executable(url, bin_cache, ignore_cache)
+
+ # the method is only kept for testing and backward compatibility
+ def path_to_binary(self) -> str:
+ return self._resolve_source_path()
+
+ # the method is only kept for testing and backward compatibility
+ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+ return self._construct_download_url(self._version, root_tag, sys, mach)
+
+ @staticmethod
+ def _prepare_executable(
+ url: str, bin_cache: str, ignore_cache: bool = True) -> str:
+ """
+ Given a path to a local artifact (zip or binary), makes it an
+ executable binary file.
+
+ Returns the path to the final, executable binary.
+ """
assert (os.path.isfile(url))
if zipfile.is_zipfile(url):
@@ -163,14 +185,19 @@ class PrismJobServer(job_server.SubprocessJobServer):
os.chmod(target_url, st.st_mode | stat.S_IEXEC)
return target_url
- # Finds the bin or zip in the local cache, and if not, fetches it.
- @classmethod
- def local_bin(
- cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+ @staticmethod
+ def _download_to_local_path(
+ url: str,
+ bin_cache: str = '',
+ ignore_cache: bool = False) -> tuple[str, bool]:
+ """
+ Ensures the artifact is on local disk, downloading it if necessary.
+ Returns the path to the local (potentially cached) artifact.
+ """
# ignore_cache sets whether we should always be downloading and unzipping
# the file or not, to avoid staleness issues.
if bin_cache == '':
- bin_cache = cls.BIN_CACHE
+ bin_cache = PrismJobServer.BIN_CACHE
if os.path.exists(url):
_LOGGER.info('Using local prism binary/zip from %s' % url)
cached_file = url
@@ -198,10 +225,11 @@ class PrismJobServer(job_server.SubprocessJobServer):
# If we download a new prism, then we should always use it but not
# the cached one.
ignore_cache = True
- return cls.maybe_unzip_and_make_executable(
- cached_file, bin_cache=bin_cache, ignore_cache=ignore_cache)
+ return cached_file, ignore_cache
- def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+ @staticmethod
+ def _construct_download_url(
+ version: str, root_tag: str, sys: str, mach: str) -> str:
"""Construct the prism download URL with the appropriate release tag.
This maps operating systems and machine architectures to the compatible
and canonical names used by the Go build targets.
@@ -223,56 +251,75 @@ class PrismJobServer(job_server.SubprocessJobServer):
if arch not in ['amd64', 'arm64']:
raise ValueError(
- 'Machine archictecture "%s" unsupported for constructing a Prism '
+ 'Machine architecture "%s" unsupported for constructing a Prism '
'release binary URL.' % (opsys))
- return (
- GITHUB_DOWNLOAD_PREFIX +
- f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip")
- def path_to_binary(self) -> str:
- if self._path is not None:
- # The path is overidden, check various cases.
- if os.path.exists(self._path):
- # The path is local and exists, use directly.
- return self._path
+ # Some special handling is needed when creating url for release candidates.
+ # For example, v2.66.0rc2 should have the following url
+ #
https://github.com/apache/beam/releases/download/v2.66.0-RC2/apache_beam-v2.66.0-prism-xxx-yyy.zip
+ if 'rc' in version:
+ version = version.split('rc')[0]
- if FileSystems.exists(self._path):
- # The path is in one of the supported filesystems.
- return self._path
-
- # Check if the path is a URL.
- url = urllib.parse.urlparse(self._path)
- if not url.scheme:
- raise ValueError(
- 'Unable to parse binary URL "%s". If using a full URL, make '
- 'sure the scheme is specified. If using a local file xpath, '
- 'make sure the file exists; you may have to first build prism '
- 'using `go build `.' % (self._path))
-
- # We have a URL, see if we need to construct a valid file name.
- if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
- # If this URL starts with the download prefix, let it through.
- return self._path
- # The only other valid option is a github release page.
- if not self._path.startswith(GITHUB_TAG_PREFIX):
- raise ValueError(
- 'Provided --prism_location URL is not an Apache Beam Github '
- 'Release page URL or download URL: %s' % (self._path))
- # Get the root tag for this URL
- root_tag = os.path.basename(os.path.normpath(self._path))
- return self.construct_download_url(
- root_tag, platform.system(), platform.machine())
-
- if '.dev' not in self._version:
- # Not a development version, so construct the production download URL
- return self.construct_download_url(
- self._version, platform.system(), platform.machine())
+ if 'rc' in root_tag:
+ root_tag = '-RC'.join(root_tag.split('rc'))
+ return (
+ GITHUB_DOWNLOAD_PREFIX +
+ f"{root_tag}/apache_beam-{version}-prism-{opsys}-{arch}.zip")
+
+ @staticmethod
+ def _resolve_from_location_override(path, version) -> str:
+ """Handles the case where --prism_location is explicitly set."""
+ # The path is overridden, check various cases.
+ if os.path.exists(path):
+ # The path is local and exists, use directly.
+ return path
+
+ try:
+ if FileSystems.exists(path):
+ # The path is in one of the supported filesystems.
+ return path
+ except ValueError:
+ # If there is a value error raised by Filesystems, try to resolve
+ # the path with the following steps.
+ pass
+
+ # Check if the path is a URL.
+ url = urllib.parse.urlparse(path)
+ if not url.scheme:
+ raise ValueError(
+ 'Unable to parse binary URL "%s". If using a full URL, make '
+ 'sure the scheme is specified. If using a local file xpath, '
+ 'make sure the file exists; you may have to first build prism '
+ 'using `go build `.' % (path))
+
+ # We have a URL, see if we need to construct a valid file name.
+ if path.startswith(GITHUB_DOWNLOAD_PREFIX):
+ # If this URL starts with the download prefix, let it through.
+ return path
+ # The only other valid option is a github release page.
+ if not path.startswith(GITHUB_TAG_PREFIX):
+ raise ValueError(
+ 'Provided --prism_location URL is not an Apache Beam Github '
+ 'Release page URL or download URL: %s' % (path))
+ # Get the root tag for this URL
+ root_tag = os.path.basename(os.path.normpath(path))
+ return PrismJobServer._construct_download_url(
+ version, root_tag, platform.system(), platform.machine())
+
+ @staticmethod
+ def _install_from_source(version):
+ """Builds and installs Prism from a Go source package.
+ It first tries the local module, then falls back to @latest.
+ """
# This is a development version! Assume Go is installed.
# Set the install directory to the cache location.
- envdict = {**os.environ, "GOBIN": self.BIN_CACHE}
+ envdict = {**os.environ, "GOBIN": PrismJobServer.BIN_CACHE}
PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism"
+ _LOGGER.info(
+ 'Installing prism from local source into "%s".',
+ PrismJobServer.BIN_CACHE)
process = subprocess.run(["go", "install", PRISMPKG],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
@@ -280,7 +327,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
check=False)
if process.returncode == 0:
# Successfully installed
- return '%s/prism' % (self.BIN_CACHE)
+ return '%s/prism' % (PrismJobServer.BIN_CACHE)
# We failed to build for some reason.
output = process.stdout.decode("utf-8")
@@ -300,12 +347,16 @@ class PrismJobServer(job_server.SubprocessJobServer):
'compile.\nPlease install Go (see https://go.dev/doc/install) to '
'enable automatic local builds.\n'
'Alternatively provide a binary with the --prism_location flag.'
- '\nCaptured output:\n %s' % (self._version, output))
+ '\nCaptured output:\n %s' % (version, output))
# Go is installed and claims we're not in a Go module that has access to
# the Prism package.
# Fallback to using the @latest version of prism, which works everywhere.
+ _LOGGER.info(
+ 'Installing prism from "%s@latest" into "%s".',
+ PRISMPKG,
+ PrismJobServer.BIN_CACHE)
process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
@@ -313,7 +364,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
check=False)
if process.returncode == 0:
- return '%s/prism' % (self.BIN_CACHE)
+ return '%s/prism' % (PrismJobServer.BIN_CACHE)
output = process.stdout.decode("utf-8")
raise ValueError(
@@ -322,10 +373,45 @@ class PrismJobServer(job_server.SubprocessJobServer):
'--prism_location flag.'
'\nCaptured output:\n %s' % (process.args, output))
+ def _resolve_source_path(self) -> str:
+ """Resolves and returns the source for the Prism binary.
+
+ The resolution follows this order:
+
+ 1. A user-provided location (local path, GCS, or URL).
+ 2. A pre-built binary from GitHub for a release version.
+ 3. Build from local Go source for a development version.
+ """
+ if self._path:
+ return self._resolve_from_location_override(self._path, self._version)
+
+ if '.dev' not in self._version:
+ # Not a development version, so construct the production download URL
+ return self._construct_download_url(
+ self._version, self._version, platform.system(), platform.machine())
+
+ return self._install_from_source(self._version)
+
+ def _get_executable_path(self) -> str:
+ """Orchestrates the process of getting a ready-to-use Prism binary."""
+ source = self._resolve_source_path()
+ if source == "%s/prism" % (self.BIN_CACHE):
+ # source is from go installation, so it is already a local binary
+ return self._prepare_executable(source, self.BIN_CACHE, True)
+
+ # Always re-download/extract if a custom path was provided to avoid
+ # staleness
+ ignore_cache = self._path is not None
+
+ local_path, ignore_cache = self._download_to_local_path(source,
+ self.BIN_CACHE,
+ ignore_cache)
+
+ return self._prepare_executable(local_path, self.BIN_CACHE, ignore_cache)
+
def subprocess_cmd_and_endpoint(
self) -> typing.Tuple[typing.List[typing.Any], str]:
- bin_path = self.local_bin(
- self.path_to_binary(), ignore_cache=(self._path is not None))
+ bin_path = self._get_executable_path()
job_port, = subprocess_server.pick_port(self._job_port)
subprocess_cmd = [bin_path] + self.prism_arguments(job_port)
return (subprocess_cmd, f"localhost:{job_port}")