damccorm commented on code in PR #36852:
URL: https://github.com/apache/beam/pull/36852#discussion_r2543471873


##########
sdks/python/apache_beam/typehints/native_type_compatibility.py:
##########
@@ -335,11 +336,13 @@ def convert_to_beam_type(typ):
   # pipe operator as Union and types.UnionType are introduced
   # in Python 3.10.
   # GH issue: https://github.com/apache/beam/issues/21972
-  if isinstance(typ, types.UnionType):
+  if (sys.version_info.major == 3 and
+      sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)):
     typ = typing.Union[typ]
 
   # Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying 
value.
   # This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]).
+  import sys

Review Comment:
   Why do we need an extra import here? This is imported at the top level



##########
sdks/python/conftest.py:
##########
@@ -101,56 +110,86 @@ def configure_beam_rpc_timeouts():
   print("Successfully configured Beam RPC timeouts")
 
 
+def _running_in_ci():
+  """Returns True if running in a CI environment."""
+  return (
+      os.getenv('GITHUB_ACTIONS') == 'true' or
+      os.getenv('CI') == 'true' or
+      os.getenv('CONTINUOUS_INTEGRATION') == 'true'
+  )
+
+
+def _should_enable_test_cleanup(config):
+  """Returns True if expensive cleanup operations should run.
+
+  Result is cached on config object to avoid re-computation per test.
+  """
+  if hasattr(config, '_should_enable_test_cleanup_result'):
+    return config._should_enable_test_cleanup_result
+
+  if config.getoption('--enable-test-cleanup'):
+    result = True
+    reason = "enabled via --enable-test-cleanup"
+  else:
+    if _running_in_ci():
+      result = True
+      reason = "CI detected"
+    else:
+      result = False
+      reason = "local development"
+
+  # Log once per session
+  if not hasattr(config, '_cleanup_decision_logged'):
+    print(f"\n[Test Cleanup] Enabled: {result} ({reason})")
+    config._cleanup_decision_logged = True
+
+  config._should_enable_test_cleanup_result = result
+  return result
+
+
 @pytest.fixture(autouse=True)
-def ensure_clean_state():
+def ensure_clean_state(request):
   """
-  Ensure clean state before each test
-  to prevent cross-test contamination.
+  Ensures clean state between tests to prevent contamination.
+
+  Expensive operations (sleeps, extra GC) only run in CI or when
+  explicitly enabled to keep local tests fast.
   """
-  import gc
-  import threading
-  import time
+  enable_cleanup = _should_enable_test_cleanup(request.config)
 
-  # Force garbage collection to clean up any lingering resources
-  gc.collect()
+  if enable_cleanup:

Review Comment:
   I'm a little confused - how does this speed up our CI tests? Won't the 
behavior in CI be the exact same since enable_cleanup will evaluate to True?



##########
sdks/python/apache_beam/typehints/native_type_compatibility.py:
##########
@@ -335,11 +336,13 @@ def convert_to_beam_type(typ):
   # pipe operator as Union and types.UnionType are introduced
   # in Python 3.10.
   # GH issue: https://github.com/apache/beam/issues/21972
-  if isinstance(typ, types.UnionType):
+  if (sys.version_info.major == 3 and
+      sys.version_info.minor >= 10) and (isinstance(typ, types.UnionType)):
     typ = typing.Union[typ]
 
   # Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying 
value.
   # This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]).
+  import sys

Review Comment:
   Why do we need an extra import here? This is imported at the top level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to