This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2784cde488f Refactor PrismJobServer python class (#35516)
2784cde488f is described below

commit 2784cde488f2308f0553f8c9ec3337df3ed04b96
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jul 7 09:46:11 2025 -0400

    Refactor PrismJobServer python class (#35516)
    
    * Refactor PrismRunner class in Python. Fix typos and add comments.
    
    * Add some handling for release candidate versions.
    
    * Fix lints.
---
 .../runners/portability/prism_runner.py            | 202 +++++++++++++++------
 1 file changed, 144 insertions(+), 58 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 654ad8da826..14854ca14a3 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -112,7 +112,6 @@ def _rename_if_different(src, dst):
 
 
 class PrismJobServer(job_server.SubprocessJobServer):
-  PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism")
   BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
 
   def __init__(self, options):
@@ -131,9 +130,32 @@ class PrismJobServer(job_server.SubprocessJobServer):
     job_options = options.view_as(pipeline_options.JobServerOptions)
     self._job_port = job_options.job_port
 
+  # the method is only kept for testing and backward compatibility
   @classmethod
-  def maybe_unzip_and_make_executable(
-      cls, url: str, bin_cache: str, ignore_cache: bool = True) -> str:
+  def local_bin(
+      cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+    url, ignore_cache = cls._download_to_local_path(url,
+                                                    bin_cache,
+                                                    ignore_cache)
+    return cls._prepare_executable(url, bin_cache, ignore_cache)
+
+  # the method is only kept for testing and backward compatibility
+  def path_to_binary(self) -> str:
+    return self._resolve_source_path()
+
+  # the method is only kept for testing and backward compatibility
+  def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+    return self._construct_download_url(self._version, root_tag, sys, mach)
+
+  @staticmethod
+  def _prepare_executable(
+      url: str, bin_cache: str, ignore_cache: bool = True) -> str:
+    """
+    Given a path to a local artifact (zip or binary), makes it an
+    executable binary file.
+
+    Returns the path to the final, executable binary.
+    """
     assert (os.path.isfile(url))
 
     if zipfile.is_zipfile(url):
@@ -163,14 +185,19 @@ class PrismJobServer(job_server.SubprocessJobServer):
     os.chmod(target_url, st.st_mode | stat.S_IEXEC)
     return target_url
 
-  # Finds the bin or zip in the local cache, and if not, fetches it.
-  @classmethod
-  def local_bin(
-      cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
+  @staticmethod
+  def _download_to_local_path(
+      url: str,
+      bin_cache: str = '',
+      ignore_cache: bool = False) -> tuple[str, bool]:
+    """
+    Ensures the artifact is on local disk, downloading it if necessary.
+    Returns the path to the local (potentially cached) artifact.
+    """
     # ignore_cache sets whether we should always be downloading and unzipping
     # the file or not, to avoid staleness issues.
     if bin_cache == '':
-      bin_cache = cls.BIN_CACHE
+      bin_cache = PrismJobServer.BIN_CACHE
     if os.path.exists(url):
       _LOGGER.info('Using local prism binary/zip from %s' % url)
       cached_file = url
@@ -198,10 +225,11 @@ class PrismJobServer(job_server.SubprocessJobServer):
         # If we download a new prism, then we should always use it but not
         # the cached one.
         ignore_cache = True
-    return cls.maybe_unzip_and_make_executable(
-        cached_file, bin_cache=bin_cache, ignore_cache=ignore_cache)
+    return cached_file, ignore_cache
 
-  def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
+  @staticmethod
+  def _construct_download_url(
+      version: str, root_tag: str, sys: str, mach: str) -> str:
     """Construct the prism download URL with the appropriate release tag.
     This maps operating systems and machine architectures to the compatible
     and canonical names used by the Go build targets.
@@ -223,56 +251,75 @@ class PrismJobServer(job_server.SubprocessJobServer):
 
     if arch not in ['amd64', 'arm64']:
       raise ValueError(
-          'Machine archictecture "%s" unsupported for constructing a Prism '
+          'Machine architecture "%s" unsupported for constructing a Prism '
           'release binary URL.' % (opsys))
-    return (
-        GITHUB_DOWNLOAD_PREFIX +
-        f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip")
 
-  def path_to_binary(self) -> str:
-    if self._path is not None:
-      # The path is overidden, check various cases.
-      if os.path.exists(self._path):
-        # The path is local and exists, use directly.
-        return self._path
+    # Some special handling is needed when creating url for release candidates.
+    # For example, v2.66.0rc2 should have the following url
+    # 
https://github.com/apache/beam/releases/download/v2.66.0-RC2/apache_beam-v2.66.0-prism-xxx-yyy.zip
+    if 'rc' in version:
+      version = version.split('rc')[0]
 
-      if FileSystems.exists(self._path):
-        # The path is in one of the supported filesystems.
-        return self._path
-
-      # Check if the path is a URL.
-      url = urllib.parse.urlparse(self._path)
-      if not url.scheme:
-        raise ValueError(
-            'Unable to parse binary URL "%s". If using a full URL, make '
-            'sure the scheme is specified. If using a local file xpath, '
-            'make sure the file exists; you may have to first build prism '
-            'using `go build `.' % (self._path))
-
-      # We have a URL, see if we need to construct a valid file name.
-      if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
-        # If this URL starts with the download prefix, let it through.
-        return self._path
-      # The only other valid option is a github release page.
-      if not self._path.startswith(GITHUB_TAG_PREFIX):
-        raise ValueError(
-            'Provided --prism_location URL is not an Apache Beam Github '
-            'Release page URL or download URL: %s' % (self._path))
-      # Get the root tag for this URL
-      root_tag = os.path.basename(os.path.normpath(self._path))
-      return self.construct_download_url(
-          root_tag, platform.system(), platform.machine())
-
-    if '.dev' not in self._version:
-      # Not a development version, so construct the production download URL
-      return self.construct_download_url(
-          self._version, platform.system(), platform.machine())
+    if 'rc' in root_tag:
+      root_tag = '-RC'.join(root_tag.split('rc'))
 
+    return (
+        GITHUB_DOWNLOAD_PREFIX +
+        f"{root_tag}/apache_beam-{version}-prism-{opsys}-{arch}.zip")
+
+  @staticmethod
+  def _resolve_from_location_override(path, version) -> str:
+    """Handles the case where --prism_location is explicitly set."""
+    # The path is overridden, check various cases.
+    if os.path.exists(path):
+      # The path is local and exists, use directly.
+      return path
+
+    try:
+      if FileSystems.exists(path):
+        # The path is in one of the supported filesystems.
+        return path
+    except ValueError:
+      # If there is a value error raised by Filesystems, try to resolve
+      # the path with the following steps.
+      pass
+
+    # Check if the path is a URL.
+    url = urllib.parse.urlparse(path)
+    if not url.scheme:
+      raise ValueError(
+          'Unable to parse binary URL "%s". If using a full URL, make '
+          'sure the scheme is specified. If using a local file xpath, '
+          'make sure the file exists; you may have to first build prism '
+          'using `go build `.' % (path))
+
+    # We have a URL, see if we need to construct a valid file name.
+    if path.startswith(GITHUB_DOWNLOAD_PREFIX):
+      # If this URL starts with the download prefix, let it through.
+      return path
+    # The only other valid option is a github release page.
+    if not path.startswith(GITHUB_TAG_PREFIX):
+      raise ValueError(
+          'Provided --prism_location URL is not an Apache Beam Github '
+          'Release page URL or download URL: %s' % (path))
+    # Get the root tag for this URL
+    root_tag = os.path.basename(os.path.normpath(path))
+    return PrismJobServer._construct_download_url(
+        version, root_tag, platform.system(), platform.machine())
+
+  @staticmethod
+  def _install_from_source(version):
+    """Builds and installs Prism from a Go source package.
+    It first tries the local module, then falls back to @latest.
+    """
     # This is a development version! Assume Go is installed.
     # Set the install directory to the cache location.
-    envdict = {**os.environ, "GOBIN": self.BIN_CACHE}
+    envdict = {**os.environ, "GOBIN": PrismJobServer.BIN_CACHE}
     PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism"
 
+    _LOGGER.info(
+        'Installing prism from local source into "%s".',
+        PrismJobServer.BIN_CACHE)
     process = subprocess.run(["go", "install", PRISMPKG],
                              stdout=subprocess.PIPE,
                              stderr=subprocess.STDOUT,
@@ -280,7 +327,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
                              check=False)
     if process.returncode == 0:
       # Successfully installed
-      return '%s/prism' % (self.BIN_CACHE)
+      return '%s/prism' % (PrismJobServer.BIN_CACHE)
 
     # We failed to build for some reason.
     output = process.stdout.decode("utf-8")
@@ -300,12 +347,16 @@ class PrismJobServer(job_server.SubprocessJobServer):
           'compile.\nPlease install Go (see https://go.dev/doc/install) to '
           'enable automatic local builds.\n'
           'Alternatively provide a binary with the --prism_location flag.'
-          '\nCaptured output:\n %s' % (self._version, output))
+          '\nCaptured output:\n %s' % (version, output))
 
     # Go is installed and claims we're not in a Go module that has access to
     # the Prism package.
 
     # Fallback to using the @latest version of prism, which works everywhere.
+    _LOGGER.info(
+        'Installing prism from "%s@latest" into "%s".',
+        PRISMPKG,
+        PrismJobServer.BIN_CACHE)
     process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
                              stdout=subprocess.PIPE,
                              stderr=subprocess.STDOUT,
@@ -313,7 +364,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
                              check=False)
 
     if process.returncode == 0:
-      return '%s/prism' % (self.BIN_CACHE)
+      return '%s/prism' % (PrismJobServer.BIN_CACHE)
 
     output = process.stdout.decode("utf-8")
     raise ValueError(
@@ -322,10 +373,45 @@ class PrismJobServer(job_server.SubprocessJobServer):
         '--prism_location flag.'
         '\nCaptured output:\n %s' % (process.args, output))
 
+  def _resolve_source_path(self) -> str:
+    """Resolves and returns the source for the Prism binary.
+
+    The resolution follows this order:
+
+    1. A user-provided location (local path, GCS, or URL).
+    2. A pre-built binary from GitHub for a release version.
+    3. Build from local Go source for a development version.
+    """
+    if self._path:
+      return self._resolve_from_location_override(self._path, self._version)
+
+    if '.dev' not in self._version:
+      # Not a development version, so construct the production download URL
+      return self._construct_download_url(
+          self._version, self._version, platform.system(), platform.machine())
+
+    return self._install_from_source(self._version)
+
+  def _get_executable_path(self) -> str:
+    """Orchestrates the process of getting a ready-to-use Prism binary."""
+    source = self._resolve_source_path()
+    if source == "%s/prism" % (self.BIN_CACHE):
+      # source is from go installation, so it is already a local binary
+      return self._prepare_executable(source, self.BIN_CACHE, True)
+
+    # Always re-download/extract if a custom path was provided to avoid
+    # staleness
+    ignore_cache = self._path is not None
+
+    local_path, ignore_cache = self._download_to_local_path(source,
+                                                            self.BIN_CACHE,
+                                                            ignore_cache)
+
+    return self._prepare_executable(local_path, self.BIN_CACHE, ignore_cache)
+
   def subprocess_cmd_and_endpoint(
       self) -> typing.Tuple[typing.List[typing.Any], str]:
-    bin_path = self.local_bin(
-        self.path_to_binary(), ignore_cache=(self._path is not None))
+    bin_path = self._get_executable_path()
     job_port, = subprocess_server.pick_port(self._job_port)
     subprocess_cmd = [bin_path] + self.prism_arguments(job_port)
     return (subprocess_cmd, f"localhost:{job_port}")

Reply via email to