tvalentyn commented on code in PR #37522: URL: https://github.com/apache/beam/pull/37522#discussion_r2784234228
########## sdks/python/apache_beam/options/pipeline_construction_options_test.py: ########## @@ -0,0 +1,301 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for pipeline_construction_options module. + +These tests verify that the contextvar-based approach properly isolates +pipeline options across threads and async tasks, preventing race conditions. +""" + +import asyncio +import threading +import unittest + +import apache_beam as beam +from apache_beam.options.pipeline_construction_options import get_current_pipeline_options +from apache_beam.options.pipeline_construction_options import scoped_pipeline_options +from apache_beam.options.pipeline_options import PipelineOptions + + +class PipelineConstructionOptionsTest(unittest.TestCase): + def test_nested_scoping(self): + """Test that nested scopes properly restore outer options.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + self.assertIs(get_current_pipeline_options(), outer_options) + + with scoped_pipeline_options(inner_options): + self.assertIs(get_current_pipeline_options(), inner_options) + + self.assertIs(get_current_pipeline_options(), outer_options) + + self.assertIsNone(get_current_pipeline_options()) + + def test_exception_in_scope_restores_options(self): + """Test that options are restored even when an exception is raised.""" + outer_options = PipelineOptions(['--runner=DirectRunner']) + inner_options = PipelineOptions(['--runner=DataflowRunner']) + + with scoped_pipeline_options(outer_options): + try: + with scoped_pipeline_options(inner_options): + self.assertIs(get_current_pipeline_options(), inner_options) + raise ValueError("Test exception") + except ValueError: + pass + + self.assertIs(get_current_pipeline_options(), outer_options) + + def test_thread_isolation(self): Review Comment: how abt: `test_different_threads_see_their_own_isolated_options` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
