This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 87c50c67e37 [Python] Support large pipeline options via file (#37379)
87c50c67e37 is described below

commit 87c50c67e37cb48bade24c475debbfeca2e08982
Author: Mathijs Deelen <[email protected]>
AuthorDate: Thu Feb 19 20:49:50 2026 -0500

    [Python] Support large pipeline options via file (#37379)
    
    * Support large pipeline options in Python SDK #37370
    
    * reformatted code to match the project's standards to pass checks
    
    * Fixed PythonFormatterPreCommitscript error"
    
    * Fix import order: move google.protobuf above apache_beam
    
    * Removed blank line between import and google import
    
    * Addressed the review comments: cleaned up comments, align error message 
with JAVA and GO SDKs
    
    * Fixed Formatting Error
    
    * Fixed line lenght that causes lint error
    
    * rerun tests
    
    * Rerun tests
    
    * Fix comment formatting in sdk_worker_main.py
    
    * Fix: Update exception handling after bot review
    
    * Set bootstrap log level to INFO in create_harness
    
    * Fix formatting in sdk_worker_main.py
    
    * Documents: Updated CHANGES.md for file-based pipeline options
    
    * Fix CHANGES.md formatting issues
    
    * Fix CHANGES.md, correct issue link
    
    * Fix CHANGES.md, correct issue link
    
    * Updated issue number in brackets to [#37370] in CHANGES.md
    
    * Updated issue number in brackets to [#37370] in CHANGES.md
    
    ---------
    
    Co-authored-by: Shunping Huang <[email protected]>
---
 CHANGES.md                                         |  4 ++--
 .../apache_beam/runners/worker/sdk_worker_main.py  | 27 ++++++++++++++++++++--
 sdks/python/container/boot.go                      |  6 ++++-
 3 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1f92a23282b..5499cb06647 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,11 +68,11 @@
 
 ## New Features / Improvements
 
-* X feature added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* Added support for large pipeline options via a file (Python) 
([#37370](https://github.com/apache/beam/issues/37370)).
 
 ## Breaking Changes
 
-* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* The Python SDK container's `boot.go` now passes pipeline options through a 
file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a 
new Python SDK container with an older SDK version (which does not support the 
file-based approach), the pipeline options will not be recognized and the 
pipeline will fail. Users must ensure their SDK and container versions are 
synchronized ([#37370](https://github.com/apache/beam/issues/37370)).
 
 ## Deprecations
 
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py 
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index e4dd6cc2121..754a631eaf3 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -73,6 +73,10 @@ def _import_beam_plugins(plugins):
 def create_harness(environment, dry_run=False):
   """Creates SDK Fn Harness."""
 
+  # Bootstrap log level to capture startup events until pipeline options are
+  # parsed and the actual log level is set.
+  logging.getLogger().setLevel(logging.INFO)
+
   deferred_exception = None
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
     try:
@@ -93,8 +97,24 @@ def create_harness(environment, dry_run=False):
   else:
     fn_log_handler = None
 
-  pipeline_options_dict = _load_pipeline_options(
-      environment.get('PIPELINE_OPTIONS'))
+  options_json = environment.get('PIPELINE_OPTIONS')
+
+  # We check if options are stored in the file.
+  if 'PIPELINE_OPTIONS_FILE' in environment:
+    options_file = environment['PIPELINE_OPTIONS_FILE']
+    try:
+      with open(options_file, 'r') as f:
+        options_json = f.read()
+        _LOGGER.info('Load pipeline options from file: %s', options_file)
+    except Exception:
+      _LOGGER.error(
+          'Failed to load pipeline options from file: %s',
+          options_file,
+          exc_info=True)
+      raise
+
+  pipeline_options_dict = _load_pipeline_options(options_json)
+
   default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
   logging.getLogger().setLevel(default_log_level)
   _set_log_level_overrides(pipeline_options_dict)
@@ -239,6 +259,7 @@ def terminate_sdk_harness():
 
 
 def _load_pipeline_options(options_json):
+  """Deserialize the pipeline options from a JSON string into a dictionary."""
   if options_json is None:
     return {}
   options = json.loads(options_json)
@@ -256,6 +277,8 @@ def _load_pipeline_options(options_json):
 
 
 def _parse_pipeline_options(options_json):
+  """Parses the pipeline options from a JSON string into a PipelineOptions 
+  object."""
   return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
 
 
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 572dbf01113..85e5b07a121 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -259,7 +259,11 @@ func launchSDKProcess() error {
 
        // (3) Invoke python
 
-       os.Setenv("PIPELINE_OPTIONS", options)
+       // Write the JSON string of pipeline options into a file to prevent 
"argument list too long" error.
+       if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
+               logger.Fatalf(ctx, "Failed to load pipeline options to worker: 
%v", err)
+       }
+
        os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
        os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
        os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", 
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())

Reply via email to