This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 87c50c67e37 [Python] Support large pipeline options via file (#37379)
87c50c67e37 is described below
commit 87c50c67e37cb48bade24c475debbfeca2e08982
Author: Mathijs Deelen <[email protected]>
AuthorDate: Thu Feb 19 20:49:50 2026 -0500
[Python] Support large pipeline options via file (#37379)
* Support large pipeline options in Python SDK #37370
* reformatted code to match the project's standards to pass checks
* Fixed PythonFormatterPreCommitscript error"
* Fix import order: move google.protobuf above apache_beam
* Removed blank line between import and google import
* Addressed the review comments: cleaned up comments, align error message
with JAVA and GO SDKs
* Fixed Formatting Error
* Fixed line lenght that causes lint error
* rerun tests
* Rerun tests
* Fix comment formatting in sdk_worker_main.py
* Fix: Update exception handling after bot review
* Set bootstrap log level to INFO in create_harness
* Fix formatting in sdk_worker_main.py
* Documents: Updated CHANGES.md for file-based pipeline options
* Fix CHANGES.md formatting issues
* Fix CHANGES.md, correct issue link
* Fix CHANGES.md, correct issue link
* Updated issue number in brackets to [#37370] in CHANGES.md
* Updated issue number in brackets to [#37370] in CHANGES.md
---------
Co-authored-by: Shunping Huang <[email protected]>
---
CHANGES.md | 4 ++--
.../apache_beam/runners/worker/sdk_worker_main.py | 27 ++++++++++++++++++++--
sdks/python/container/boot.go | 6 ++++-
3 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1f92a23282b..5499cb06647 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,11 +68,11 @@
## New Features / Improvements
-* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Added support for large pipeline options via a file (Python)
([#37370](https://github.com/apache/beam/issues/37370)).
## Breaking Changes
-* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
+* The Python SDK container's `boot.go` now passes pipeline options through a
file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a
new Python SDK container with an older SDK version (which does not support the
file-based approach), the pipeline options will not be recognized and the
pipeline will fail. Users must ensure their SDK and container versions are
synchronized ([#37370](https://github.com/apache/beam/issues/37370)).
## Deprecations
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index e4dd6cc2121..754a631eaf3 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -73,6 +73,10 @@ def _import_beam_plugins(plugins):
def create_harness(environment, dry_run=False):
"""Creates SDK Fn Harness."""
+ # Bootstrap log level to capture startup events until pipeline options are
+ # parsed and the actual log level is set.
+ logging.getLogger().setLevel(logging.INFO)
+
deferred_exception = None
if 'LOGGING_API_SERVICE_DESCRIPTOR' in environment:
try:
@@ -93,8 +97,24 @@ def create_harness(environment, dry_run=False):
else:
fn_log_handler = None
- pipeline_options_dict = _load_pipeline_options(
- environment.get('PIPELINE_OPTIONS'))
+ options_json = environment.get('PIPELINE_OPTIONS')
+
+ # We check if options are stored in the file.
+ if 'PIPELINE_OPTIONS_FILE' in environment:
+ options_file = environment['PIPELINE_OPTIONS_FILE']
+ try:
+ with open(options_file, 'r') as f:
+ options_json = f.read()
+ _LOGGER.info('Load pipeline options from file: %s', options_file)
+ except Exception:
+ _LOGGER.error(
+ 'Failed to load pipeline options from file: %s',
+ options_file,
+ exc_info=True)
+ raise
+
+ pipeline_options_dict = _load_pipeline_options(options_json)
+
default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
logging.getLogger().setLevel(default_log_level)
_set_log_level_overrides(pipeline_options_dict)
@@ -239,6 +259,7 @@ def terminate_sdk_harness():
def _load_pipeline_options(options_json):
+ """Deserialize the pipeline options from a JSON string into a dictionary."""
if options_json is None:
return {}
options = json.loads(options_json)
@@ -256,6 +277,8 @@ def _load_pipeline_options(options_json):
def _parse_pipeline_options(options_json):
+ """Parses the pipeline options from a JSON string into a PipelineOptions
+ object."""
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index 572dbf01113..85e5b07a121 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -259,7 +259,11 @@ func launchSDKProcess() error {
// (3) Invoke python
- os.Setenv("PIPELINE_OPTIONS", options)
+ // Write the JSON string of pipeline options into a file to prevent
"argument list too long" error.
+ if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
+ logger.Fatalf(ctx, "Failed to load pipeline options to worker:
%v", err)
+ }
+
os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR",
(&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())