This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7226545f3 Only stage required wheel packages from requirements 
cache, not entire cache directory (#37360)
0d7226545f3 is described below

commit 0d7226545f329621ea13c5e5af84ebbb6b40b11d
Author: Nayan Mathur <[email protected]>
AuthorDate: Thu Jan 29 22:48:11 2026 +0530

    Only stage required wheel packages from requirements cache, not entire 
cache directory (#37360)
    
    * Fix cached wheels used in future runs
    
    * address review comments
    
    * run post tests
    
    * add .github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
---
 .github/trigger_files/beam_PostCommit_Python.json  |  4 +-
 ... beam_PostCommit_Python_Examples_Dataflow.json} |  4 +-
 .../apache_beam/runners/portability/stager.py      | 60 +++++++++++----
 .../apache_beam/runners/portability/stager_test.py | 89 ++++++++++++++++++++++
 4 files changed, 138 insertions(+), 19 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index e43868bf4f2..d36d0db940c 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "pr": "36271",
-  "modification": 37
+  "pr": "37360",
+  "modification": 38
 }
\ No newline at end of file
diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
similarity index 68%
copy from .github/trigger_files/beam_PostCommit_Python.json
copy to .github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
index e43868bf4f2..b8513bdfc7b 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "pr": "36271",
-  "modification": 37
+  "pr": "37360",
+  "modification": 1
 }
\ No newline at end of file
diff --git a/sdks/python/apache_beam/runners/portability/stager.py 
b/sdks/python/apache_beam/runners/portability/stager.py
index 17cf6514cac..668477ce146 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -213,13 +213,15 @@ class Stager(object):
     # if we know we are using a dependency pre-installed sdk container image.
     if not skip_prestaged_dependencies:
       requirements_cache_path = (
-          os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
+          os.path.join(tempfile.gettempdir(), 'beam-requirements-cache') if
           (setup_options.requirements_cache
            is None) else setup_options.requirements_cache)
       if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and
           not os.path.exists(requirements_cache_path)):
         os.makedirs(requirements_cache_path, exist_ok=True)
 
+      # Track packages to stage for this specific run.
+      packages_to_stage = set()
       # Stage a requirements file if present.
       if setup_options.requirements_file is not None:
         if not os.path.isfile(setup_options.requirements_file):
@@ -245,12 +247,16 @@ class Stager(object):
               'such as --requirements_file. ')
 
         if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
-          (
+          populate_cache_callable = (
               populate_requirements_cache if populate_requirements_cache else
-              Stager._populate_requirements_cache)(
-                  setup_options.requirements_file,
-                  requirements_cache_path,
-                  setup_options.requirements_cache_only_sources)
+              Stager._populate_requirements_cache)
+
+          downloaded_packages = populate_cache_callable(
+              setup_options.requirements_file,
+              requirements_cache_path,
+              setup_options.requirements_cache_only_sources)
+          if downloaded_packages:
+            packages_to_stage.update(downloaded_packages)
 
       if pypi_requirements:
         tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
@@ -260,18 +266,23 @@ class Stager(object):
         # Populate cache with packages from PyPI requirements and stage
         # the files in the cache.
         if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
-          (
+          populate_cache_callable = (
               populate_requirements_cache if populate_requirements_cache else
-              Stager._populate_requirements_cache)(
-                  tf.name,
-                  requirements_cache_path,
-                  setup_options.requirements_cache_only_sources)
+              Stager._populate_requirements_cache)
+          downloaded_packages = populate_cache_callable(
+              tf.name,
+              requirements_cache_path,
+              setup_options.requirements_cache_only_sources)
+          if downloaded_packages:
+            packages_to_stage.update(downloaded_packages)
 
       if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
           setup_options.requirements_file is not None or pypi_requirements):
-        for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
-          resources.append(
-              Stager._create_file_stage_to_artifact(pkg, 
os.path.basename(pkg)))
+        for pkg in packages_to_stage:
+          pkg_path = os.path.join(requirements_cache_path, pkg)
+          if os.path.exists(pkg_path):
+            resources.append(
+                Stager._create_file_stage_to_artifact(pkg_path, pkg))
 
       # Handle a setup file if present.
       # We will build the setup package locally and then copy it to the staging
@@ -741,19 +752,26 @@ class Stager(object):
     # the requirements file and will download package dependencies.
 
     # The apache-beam dependency  is excluded from requirements cache 
population
-    # because we  stage the SDK separately.
+    # because we stage the SDK separately.
     with tempfile.TemporaryDirectory() as temp_directory:
       tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
           requirements_file=requirements_file,
           dependency_to_remove='apache-beam',
           temp_directory_path=temp_directory)
 
+      # Download to a temporary directory first, then copy to cache.
+      # This allows us to track exactly which packages are needed for this
+      # requirements file.
+      download_dir = tempfile.mkdtemp(dir=temp_directory)
+
       cmd_args = [
           Stager._get_python_executable(),
           '-m',
           'pip',
           'download',
           '--dest',
+          download_dir,
+          '--find-links',
           cache_dir,
           '-r',
           tmp_requirements_filepath,
@@ -781,6 +799,18 @@ class Stager(object):
       _LOGGER.info('Executing command: %s', cmd_args)
       processes.check_output(cmd_args, stderr=processes.STDOUT)
 
+      # Get list of downloaded packages and copy them to the cache
+      downloaded_packages = set()
+      for pkg_file in os.listdir(download_dir):
+        downloaded_packages.add(pkg_file)
+        src_path = os.path.join(download_dir, pkg_file)
+        dest_path = os.path.join(cache_dir, pkg_file)
+        # Only copy if not already in cache
+        if not os.path.exists(dest_path):
+          shutil.copy2(src_path, dest_path)
+
+      return downloaded_packages
+
   @staticmethod
   def _build_setup_package(
       setup_file: str,
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py 
b/sdks/python/apache_beam/runners/portability/stager_test.py
index 233e0c3dcea..4ec1c697fbf 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -100,6 +100,8 @@ class StagerTest(unittest.TestCase):
     _ = requirements_file
     self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
     self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
+    # Return the list of packages that were "downloaded" for this requirements
+    return {'abc.txt', 'def.txt'}
 
   @mock.patch('apache_beam.runners.portability.stager.open')
   @mock.patch('apache_beam.runners.portability.stager.get_new_http')
@@ -810,10 +812,14 @@ class StagerTest(unittest.TestCase):
 
   def _populate_requitements_cache_fake(
       self, requirements_file, temp_dir, populate_cache_with_sdists):
+    packages = set()
     if not populate_cache_with_sdists:
       self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
+      packages.add('nothing.whl')
     self.create_temp_file(
         os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
+    packages.add('nothing.tar.gz')
+    return packages
 
   # requirements cache will popultated with bdist/whl if present
   # else source would be downloaded.
@@ -913,6 +919,89 @@ class StagerTest(unittest.TestCase):
       self.assertNotIn('fake_pypi', extra_packages_contents)
       self.assertIn('local_package', extra_packages_contents)
 
+  def test_only_required_packages_staged_from_cache(self):
+    """Test that only packages needed for current requirements are staged.
+
+    This test verifies the fix for the issue where the entire cache directory
+    was being staged, even when some packages weren't needed for the current
+    workflow.
+    """
+    staging_dir = self.make_temp_dir()
+    requirements_cache_dir = self.make_temp_dir()
+    source_dir = self.make_temp_dir()
+
+    # Pre-populate cache with packages from a "previous run"
+    self.create_temp_file(
+        os.path.join(requirements_cache_dir, 'old_package.whl'), 'old package')
+    self.create_temp_file(
+        os.path.join(requirements_cache_dir, 'another_old.tar.gz'), 'another')
+
+    options = PipelineOptions()
+    self.update_options(options)
+    options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
+    options.view_as(SetupOptions).requirements_file = os.path.join(
+        source_dir, stager.REQUIREMENTS_FILE)
+    self.create_temp_file(
+        os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'new_package')
+
+    def populate_cache_for_new_requirements(
+        requirements_file, cache_dir, populate_cache_with_sdists=False):
+      # Simulate downloading only the packages needed for new requirements
+      self.create_temp_file(
+          os.path.join(cache_dir, 'new_package.whl'), 'new package content')
+      return {'new_package.whl'}
+
+    resources = self.stager.create_and_stage_job_resources(
+        options,
+        populate_requirements_cache=populate_cache_for_new_requirements,
+        staging_location=staging_dir)[1]
+
+    # Verify only new_package.whl is staged, not old packages
+    self.assertIn('new_package.whl', resources)
+    self.assertNotIn('old_package.whl', resources)
+    self.assertNotIn('another_old.tar.gz', resources)
+
+    # Verify the file exists in staging
+    self.assertTrue(
+        os.path.isfile(os.path.join(staging_dir, 'new_package.whl')))
+    # Verify old packages are NOT in staging
+    self.assertFalse(
+        os.path.isfile(os.path.join(staging_dir, 'old_package.whl')))
+    self.assertFalse(
+        os.path.isfile(os.path.join(staging_dir, 'another_old.tar.gz')))
+
+  def test_populate_requirements_cache_uses_find_links(self):
+    """Test that _populate_requirements_cache uses --find-links to reuse cache.
+
+    This test verifies that pip download is called with --find-links pointing
+    to the cache directory, so packages already in cache are reused instead
+    of being re-downloaded from PyPI.
+    """
+    requirements_cache_dir = self.make_temp_dir()
+    source_dir = self.make_temp_dir()
+
+    # Create a requirements file
+    requirements_file = os.path.join(source_dir, 'requirements.txt')
+    self.create_temp_file(requirements_file, 'some_package==1.0.0')
+
+    captured_cmd_args = []
+
+    def mock_check_output(cmd_args, **kwargs):
+      captured_cmd_args.extend(cmd_args)
+      return b''
+
+    with mock.patch(
+        'apache_beam.runners.portability.stager.processes.check_output',
+        side_effect=mock_check_output):
+      stager.Stager._populate_requirements_cache(
+          requirements_file, requirements_cache_dir)
+
+    # Verify --find-links is in the command with the cache directory
+    self.assertIn('--find-links', captured_cmd_args)
+    find_links_index = captured_cmd_args.index('--find-links')
+    self.assertEqual(
+        requirements_cache_dir, captured_cmd_args[find_links_index + 1])
+
 
 class TestStager(stager.Stager):
   def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):

Reply via email to