This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7226545f3 Only stage required wheel packages from requirements
cache, not entire cache directory (#37360)
0d7226545f3 is described below
commit 0d7226545f329621ea13c5e5af84ebbb6b40b11d
Author: Nayan Mathur <[email protected]>
AuthorDate: Thu Jan 29 22:48:11 2026 +0530
Only stage required wheel packages from requirements cache, not entire
cache directory (#37360)
* Fix cached wheels used in future runs
* address review comments
* run post tests
* add .github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
---
.github/trigger_files/beam_PostCommit_Python.json | 4 +-
... beam_PostCommit_Python_Examples_Dataflow.json} | 4 +-
.../apache_beam/runners/portability/stager.py | 60 +++++++++++----
.../apache_beam/runners/portability/stager_test.py | 89 ++++++++++++++++++++++
4 files changed, 138 insertions(+), 19 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
index e43868bf4f2..d36d0db940c 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "pr": "36271",
- "modification": 37
+ "pr": "37360",
+ "modification": 38
}
\ No newline at end of file
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
similarity index 68%
copy from .github/trigger_files/beam_PostCommit_Python.json
copy to .github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
index e43868bf4f2..b8513bdfc7b 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "pr": "36271",
- "modification": 37
+ "pr": "37360",
+ "modification": 1
}
\ No newline at end of file
diff --git a/sdks/python/apache_beam/runners/portability/stager.py
b/sdks/python/apache_beam/runners/portability/stager.py
index 17cf6514cac..668477ce146 100644
--- a/sdks/python/apache_beam/runners/portability/stager.py
+++ b/sdks/python/apache_beam/runners/portability/stager.py
@@ -213,13 +213,15 @@ class Stager(object):
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
- os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
+ os.path.join(tempfile.gettempdir(), 'beam-requirements-cache') if
(setup_options.requirements_cache
is None) else setup_options.requirements_cache)
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and
not os.path.exists(requirements_cache_path)):
os.makedirs(requirements_cache_path, exist_ok=True)
+ # Track packages to stage for this specific run.
+ packages_to_stage = set()
# Stage a requirements file if present.
if setup_options.requirements_file is not None:
if not os.path.isfile(setup_options.requirements_file):
@@ -245,12 +247,16 @@ class Stager(object):
'such as --requirements_file. ')
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
- (
+ populate_cache_callable = (
populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- setup_options.requirements_file,
- requirements_cache_path,
- setup_options.requirements_cache_only_sources)
+ Stager._populate_requirements_cache)
+
+ downloaded_packages = populate_cache_callable(
+ setup_options.requirements_file,
+ requirements_cache_path,
+ setup_options.requirements_cache_only_sources)
+ if downloaded_packages:
+ packages_to_stage.update(downloaded_packages)
if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
@@ -260,18 +266,23 @@ class Stager(object):
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
- (
+ populate_cache_callable = (
populate_requirements_cache if populate_requirements_cache else
- Stager._populate_requirements_cache)(
- tf.name,
- requirements_cache_path,
- setup_options.requirements_cache_only_sources)
+ Stager._populate_requirements_cache)
+ downloaded_packages = populate_cache_callable(
+ tf.name,
+ requirements_cache_path,
+ setup_options.requirements_cache_only_sources)
+ if downloaded_packages:
+ packages_to_stage.update(downloaded_packages)
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
- for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
- resources.append(
- Stager._create_file_stage_to_artifact(pkg,
os.path.basename(pkg)))
+ for pkg in packages_to_stage:
+ pkg_path = os.path.join(requirements_cache_path, pkg)
+ if os.path.exists(pkg_path):
+ resources.append(
+ Stager._create_file_stage_to_artifact(pkg_path, pkg))
# Handle a setup file if present.
# We will build the setup package locally and then copy it to the staging
@@ -741,19 +752,26 @@ class Stager(object):
# the requirements file and will download package dependencies.
# The apache-beam dependency is excluded from requirements cache
population
- # because we stage the SDK separately.
+ # because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)
+ # Download to a temporary directory first, then copy to cache.
+ # This allows us to track exactly which packages are needed for this
+ # requirements file.
+ download_dir = tempfile.mkdtemp(dir=temp_directory)
+
cmd_args = [
Stager._get_python_executable(),
'-m',
'pip',
'download',
'--dest',
+ download_dir,
+ '--find-links',
cache_dir,
'-r',
tmp_requirements_filepath,
@@ -781,6 +799,18 @@ class Stager(object):
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
+ # Get list of downloaded packages and copy them to the cache
+ downloaded_packages = set()
+ for pkg_file in os.listdir(download_dir):
+ downloaded_packages.add(pkg_file)
+ src_path = os.path.join(download_dir, pkg_file)
+ dest_path = os.path.join(cache_dir, pkg_file)
+ # Only copy if not already in cache
+ if not os.path.exists(dest_path):
+ shutil.copy2(src_path, dest_path)
+
+ return downloaded_packages
+
@staticmethod
def _build_setup_package(
setup_file: str,
diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py
b/sdks/python/apache_beam/runners/portability/stager_test.py
index 233e0c3dcea..4ec1c697fbf 100644
--- a/sdks/python/apache_beam/runners/portability/stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/stager_test.py
@@ -100,6 +100,8 @@ class StagerTest(unittest.TestCase):
_ = requirements_file
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
+ # Return the list of packages that were "downloaded" for this requirements
+ return {'abc.txt', 'def.txt'}
@mock.patch('apache_beam.runners.portability.stager.open')
@mock.patch('apache_beam.runners.portability.stager.get_new_http')
@@ -810,10 +812,14 @@ class StagerTest(unittest.TestCase):
def _populate_requitements_cache_fake(
self, requirements_file, temp_dir, populate_cache_with_sdists):
+ packages = set()
if not populate_cache_with_sdists:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
+ packages.add('nothing.whl')
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
+ packages.add('nothing.tar.gz')
+ return packages
# requirements cache will popultated with bdist/whl if present
# else source would be downloaded.
@@ -913,6 +919,89 @@ class StagerTest(unittest.TestCase):
self.assertNotIn('fake_pypi', extra_packages_contents)
self.assertIn('local_package', extra_packages_contents)
+ def test_only_required_packages_staged_from_cache(self):
+ """Test that only packages needed for current requirements are staged.
+
+ This test verifies the fix for the issue where the entire cache directory
+ was being staged, even when some packages weren't needed for the current
+ workflow.
+ """
+ staging_dir = self.make_temp_dir()
+ requirements_cache_dir = self.make_temp_dir()
+ source_dir = self.make_temp_dir()
+
+ # Pre-populate cache with packages from a "previous run"
+ self.create_temp_file(
+ os.path.join(requirements_cache_dir, 'old_package.whl'), 'old package')
+ self.create_temp_file(
+ os.path.join(requirements_cache_dir, 'another_old.tar.gz'), 'another')
+
+ options = PipelineOptions()
+ self.update_options(options)
+ options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
+ options.view_as(SetupOptions).requirements_file = os.path.join(
+ source_dir, stager.REQUIREMENTS_FILE)
+ self.create_temp_file(
+ os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'new_package')
+
+ def populate_cache_for_new_requirements(
+ requirements_file, cache_dir, populate_cache_with_sdists=False):
+ # Simulate downloading only the packages needed for new requirements
+ self.create_temp_file(
+ os.path.join(cache_dir, 'new_package.whl'), 'new package content')
+ return {'new_package.whl'}
+
+ resources = self.stager.create_and_stage_job_resources(
+ options,
+ populate_requirements_cache=populate_cache_for_new_requirements,
+ staging_location=staging_dir)[1]
+
+ # Verify only new_package.whl is staged, not old packages
+ self.assertIn('new_package.whl', resources)
+ self.assertNotIn('old_package.whl', resources)
+ self.assertNotIn('another_old.tar.gz', resources)
+
+ # Verify the file exists in staging
+ self.assertTrue(
+ os.path.isfile(os.path.join(staging_dir, 'new_package.whl')))
+ # Verify old packages are NOT in staging
+ self.assertFalse(
+ os.path.isfile(os.path.join(staging_dir, 'old_package.whl')))
+ self.assertFalse(
+ os.path.isfile(os.path.join(staging_dir, 'another_old.tar.gz')))
+
+ def test_populate_requirements_cache_uses_find_links(self):
+ """Test that _populate_requirements_cache uses --find-links to reuse cache.
+
+ This test verifies that pip download is called with --find-links pointing
+ to the cache directory, so packages already in cache are reused instead
+ of being re-downloaded from PyPI.
+ """
+ requirements_cache_dir = self.make_temp_dir()
+ source_dir = self.make_temp_dir()
+
+ # Create a requirements file
+ requirements_file = os.path.join(source_dir, 'requirements.txt')
+ self.create_temp_file(requirements_file, 'some_package==1.0.0')
+
+ captured_cmd_args = []
+
+ def mock_check_output(cmd_args, **kwargs):
+ captured_cmd_args.extend(cmd_args)
+ return b''
+
+ with mock.patch(
+ 'apache_beam.runners.portability.stager.processes.check_output',
+ side_effect=mock_check_output):
+ stager.Stager._populate_requirements_cache(
+ requirements_file, requirements_cache_dir)
+
+ # Verify --find-links is in the command with the cache directory
+ self.assertIn('--find-links', captured_cmd_args)
+ find_links_index = captured_cmd_args.index('--find-links')
+ self.assertEqual(
+ requirements_cache_dir, captured_cmd_args[find_links_index + 1])
+
class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):