This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b7262c32098 Fix a logical type issue about JdbcDateType and 
JdbcTimeType (#35243)
b7262c32098 is described below

commit b7262c3209890f5051993b05479e06f8e326d255
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Jun 12 10:10:20 2025 -0400

    Fix a logical type issue about JdbcDateType and JdbcTimeType (#35243)
    
    * Fix a logical type issue about JdbcDateType
    
    * Fix typo and also fix the logical class for java time.
    
    * Get rid of the workaround on logical type registration. Trigger tests.
    
    * Fix lints.
---
 .github/trigger_files/beam_PostCommit_Python.json  |  2 +-
 .../io/external/xlang_jdbcio_it_test.py            | 10 ------
 sdks/python/apache_beam/io/jdbc.py                 |  6 ++--
 sdks/python/apache_beam/typehints/schemas.py       | 12 +++++--
 sdks/python/apache_beam/yaml/main.py               | 39 ++++++----------------
 5 files changed, 24 insertions(+), 45 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 02369776898..62905b12a70 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 12
+  "modification": 13
 }
 
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index 9aed0d5f11d..9f90a44d9a0 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -36,8 +36,6 @@ from apache_beam.options.pipeline_options import 
StandardOptions
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.typehints.schemas import LogicalType
-from apache_beam.typehints.schemas import MillisInstant
 from apache_beam.utils.timestamp import Timestamp
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -255,10 +253,6 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
               classpath=config['classpath'],
           ))
 
-    # Register MillisInstant logical type to override the mapping from 
Timestamp
-    # originally handled by MicrosInstant.
-    LogicalType.register_logical_type(MillisInstant)
-
     with TestPipeline() as p:
       p.not_use_test_runner_api = True
       result = (
@@ -355,10 +349,6 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
               classpath=config['classpath'],
           ))
 
-    # Register MillisInstant logical type to override the mapping from 
Timestamp
-    # originally handled by MicrosInstant.
-    LogicalType.register_logical_type(MillisInstant)
-
     # Run read pipeline with custom schema
     with TestPipeline() as p:
       p.not_use_test_runner_api = True
diff --git a/sdks/python/apache_beam/io/jdbc.py 
b/sdks/python/apache_beam/io/jdbc.py
index 32ce16b358f..604b95f6eeb 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -401,7 +401,7 @@ class JdbcDateType(LogicalType[datetime.date, 
MillisInstant, str]):
 
   @classmethod
   def representation_type(cls) -> type:
-    return Timestamp
+    return MillisInstant
 
   @classmethod
   def urn(cls):
@@ -417,7 +417,6 @@ class JdbcDateType(LogicalType[datetime.date, 
MillisInstant, str]):
             value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
 
   def to_language_type(self, value: Timestamp) -> datetime.date:
-
     return value.to_utc_datetime().date()
 
   @classmethod
@@ -445,7 +444,7 @@ class JdbcTimeType(LogicalType[datetime.time, 
MillisInstant, str]):
 
   @classmethod
   def representation_type(cls) -> type:
-    return Timestamp
+    return MillisInstant
 
   @classmethod
   def urn(cls):
@@ -463,7 +462,6 @@ class JdbcTimeType(LogicalType[datetime.time, 
MillisInstant, str]):
             tzinfo=datetime.timezone.utc))
 
   def to_language_type(self, value: Timestamp) -> datetime.date:
-
     return value.to_utc_datetime().time()
 
   @classmethod
diff --git a/sdks/python/apache_beam/typehints/schemas.py 
b/sdks/python/apache_beam/typehints/schemas.py
index de4cdb9fdf7..90a692e2112 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -335,7 +335,10 @@ class SchemaTranslation(object):
           array_type=schema_pb2.ArrayType(element_type=element_type))
 
     try:
-      logical_type = LogicalType.from_typing(type_)
+      if LogicalType.is_known_logical_type(type_):
+        logical_type = type_
+      else:
+        logical_type = LogicalType.from_typing(type_)
     except ValueError:
       # Unknown type, just treat it like Any
       return schema_pb2.FieldType(
@@ -669,7 +672,7 @@ class LogicalTypeRegistry(object):
   def get_logical_type_by_urn(self, urn):
     return self.by_urn.get(urn, None)
 
-  def get_urn_by_logial_type(self, logical_type):
+  def get_urn_by_logical_type(self, logical_type):
     return self.by_logical_type.get(logical_type, None)
 
   def get_logical_type_by_language_type(self, representation_type):
@@ -808,6 +811,11 @@ class LogicalType(Generic[LanguageT, RepresentationT, 
ArgT]):
         return logical_type()
       return logical_type(argument)
 
+  @classmethod
+  def is_known_logical_type(cls, logical_type):
+    return cls._known_logical_types.get_urn_by_logical_type(
+        logical_type) is not None
+
 
 class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
   @classmethod
diff --git a/sdks/python/apache_beam/yaml/main.py 
b/sdks/python/apache_beam/yaml/main.py
index 35879514ce1..fbebaea4346 100644
--- a/sdks/python/apache_beam/yaml/main.py
+++ b/sdks/python/apache_beam/yaml/main.py
@@ -16,7 +16,6 @@
 #
 
 import argparse
-import contextlib
 import json
 import os
 import sys
@@ -27,8 +26,6 @@ import yaml
 import apache_beam as beam
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.transforms import resources
-from apache_beam.typehints.schemas import LogicalType
-from apache_beam.typehints.schemas import MillisInstant
 from apache_beam.yaml import yaml_testing
 from apache_beam.yaml import yaml_transform
 from apache_beam.yaml import yaml_utils
@@ -136,25 +133,12 @@ def _pipeline_spec_from_args(known_args):
   return pipeline_yaml
 
 
[email protected]
-def _fix_xlang_instant_coding():
-  # Scoped workaround for https://github.com/apache/beam/issues/28151.
-  old_registry = LogicalType._known_logical_types
-  LogicalType._known_logical_types = old_registry.copy()
-  try:
-    LogicalType.register_logical_type(MillisInstant)
-    yield
-  finally:
-    LogicalType._known_logical_types = old_registry
-
-
 def run(argv=None):
   options, constructor, display_data = 
build_pipeline_components_from_argv(argv)
-  with _fix_xlang_instant_coding():
-    with beam.Pipeline(options=options, display_data=display_data) as p:
-      print('Building pipeline...')
-      constructor(p)
-      print('Running pipeline...')
+  with beam.Pipeline(options=options, display_data=display_data) as p:
+    print('Building pipeline...')
+    constructor(p)
+    print('Running pipeline...')
 
 
 def run_tests(argv=None, exit=True):
@@ -185,14 +169,13 @@ def run_tests(argv=None, exit=True):
           "If you haven't added a set of tests yet, you can get started by "
           'running your pipeline with the --create_test flag enabled.')
 
-    with _fix_xlang_instant_coding():
-      tests = [
-          yaml_testing.YamlTestCase(
-              pipeline_spec, test_spec, options, known_args.fix_tests)
-          for test_spec in test_specs
-      ]
-      suite = unittest.TestSuite(tests)
-      result = unittest.TextTestRunner().run(suite)
+    tests = [
+        yaml_testing.YamlTestCase(
+            pipeline_spec, test_spec, options, known_args.fix_tests)
+        for test_spec in test_specs
+    ]
+    suite = unittest.TestSuite(tests)
+    result = unittest.TextTestRunner().run(suite)
 
   if known_args.fix_tests or known_args.create_test:
     update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)

Reply via email to