This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b7262c32098 Fix a logical type issue about JdbcDateType and
JdbcTimeType (#35243)
b7262c32098 is described below
commit b7262c3209890f5051993b05479e06f8e326d255
Author: Shunping Huang <[email protected]>
AuthorDate: Thu Jun 12 10:10:20 2025 -0400
Fix a logical type issue about JdbcDateType and JdbcTimeType (#35243)
* Fix a logical type issue about JdbcDateType
* Fix typo and also fix the logical class for java time.
* Get rid of the workaround on logical type registration. Trigger tests.
* Fix lints.
---
.github/trigger_files/beam_PostCommit_Python.json | 2 +-
.../io/external/xlang_jdbcio_it_test.py | 10 ------
sdks/python/apache_beam/io/jdbc.py | 6 ++--
sdks/python/apache_beam/typehints/schemas.py | 12 +++++--
sdks/python/apache_beam/yaml/main.py | 39 ++++++----------------
5 files changed, 24 insertions(+), 45 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
index 02369776898..62905b12a70 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 12
+ "modification": 13
}
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index 9aed0d5f11d..9f90a44d9a0 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -36,8 +36,6 @@ from apache_beam.options.pipeline_options import
StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.typehints.schemas import LogicalType
-from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -255,10 +253,6 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
classpath=config['classpath'],
))
- # Register MillisInstant logical type to override the mapping from
Timestamp
- # originally handled by MicrosInstant.
- LogicalType.register_logical_type(MillisInstant)
-
with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (
@@ -355,10 +349,6 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
classpath=config['classpath'],
))
- # Register MillisInstant logical type to override the mapping from
Timestamp
- # originally handled by MicrosInstant.
- LogicalType.register_logical_type(MillisInstant)
-
# Run read pipeline with custom schema
with TestPipeline() as p:
p.not_use_test_runner_api = True
diff --git a/sdks/python/apache_beam/io/jdbc.py
b/sdks/python/apache_beam/io/jdbc.py
index 32ce16b358f..604b95f6eeb 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -401,7 +401,7 @@ class JdbcDateType(LogicalType[datetime.date,
MillisInstant, str]):
@classmethod
def representation_type(cls) -> type:
- return Timestamp
+ return MillisInstant
@classmethod
def urn(cls):
@@ -417,7 +417,6 @@ class JdbcDateType(LogicalType[datetime.date,
MillisInstant, str]):
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
def to_language_type(self, value: Timestamp) -> datetime.date:
-
return value.to_utc_datetime().date()
@classmethod
@@ -445,7 +444,7 @@ class JdbcTimeType(LogicalType[datetime.time,
MillisInstant, str]):
@classmethod
def representation_type(cls) -> type:
- return Timestamp
+ return MillisInstant
@classmethod
def urn(cls):
@@ -463,7 +462,6 @@ class JdbcTimeType(LogicalType[datetime.time,
MillisInstant, str]):
tzinfo=datetime.timezone.utc))
def to_language_type(self, value: Timestamp) -> datetime.date:
-
return value.to_utc_datetime().time()
@classmethod
diff --git a/sdks/python/apache_beam/typehints/schemas.py
b/sdks/python/apache_beam/typehints/schemas.py
index de4cdb9fdf7..90a692e2112 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -335,7 +335,10 @@ class SchemaTranslation(object):
array_type=schema_pb2.ArrayType(element_type=element_type))
try:
- logical_type = LogicalType.from_typing(type_)
+ if LogicalType.is_known_logical_type(type_):
+ logical_type = type_
+ else:
+ logical_type = LogicalType.from_typing(type_)
except ValueError:
# Unknown type, just treat it like Any
return schema_pb2.FieldType(
@@ -669,7 +672,7 @@ class LogicalTypeRegistry(object):
def get_logical_type_by_urn(self, urn):
return self.by_urn.get(urn, None)
- def get_urn_by_logial_type(self, logical_type):
+ def get_urn_by_logical_type(self, logical_type):
return self.by_logical_type.get(logical_type, None)
def get_logical_type_by_language_type(self, representation_type):
@@ -808,6 +811,11 @@ class LogicalType(Generic[LanguageT, RepresentationT,
ArgT]):
return logical_type()
return logical_type(argument)
+ @classmethod
+ def is_known_logical_type(cls, logical_type):
+ return cls._known_logical_types.get_urn_by_logical_type(
+ logical_type) is not None
+
class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
@classmethod
diff --git a/sdks/python/apache_beam/yaml/main.py
b/sdks/python/apache_beam/yaml/main.py
index 35879514ce1..fbebaea4346 100644
--- a/sdks/python/apache_beam/yaml/main.py
+++ b/sdks/python/apache_beam/yaml/main.py
@@ -16,7 +16,6 @@
#
import argparse
-import contextlib
import json
import os
import sys
@@ -27,8 +26,6 @@ import yaml
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.transforms import resources
-from apache_beam.typehints.schemas import LogicalType
-from apache_beam.typehints.schemas import MillisInstant
from apache_beam.yaml import yaml_testing
from apache_beam.yaml import yaml_transform
from apache_beam.yaml import yaml_utils
@@ -136,25 +133,12 @@ def _pipeline_spec_from_args(known_args):
return pipeline_yaml
[email protected]
-def _fix_xlang_instant_coding():
- # Scoped workaround for https://github.com/apache/beam/issues/28151.
- old_registry = LogicalType._known_logical_types
- LogicalType._known_logical_types = old_registry.copy()
- try:
- LogicalType.register_logical_type(MillisInstant)
- yield
- finally:
- LogicalType._known_logical_types = old_registry
-
-
def run(argv=None):
options, constructor, display_data =
build_pipeline_components_from_argv(argv)
- with _fix_xlang_instant_coding():
- with beam.Pipeline(options=options, display_data=display_data) as p:
- print('Building pipeline...')
- constructor(p)
- print('Running pipeline...')
+ with beam.Pipeline(options=options, display_data=display_data) as p:
+ print('Building pipeline...')
+ constructor(p)
+ print('Running pipeline...')
def run_tests(argv=None, exit=True):
@@ -185,14 +169,13 @@ def run_tests(argv=None, exit=True):
"If you haven't added a set of tests yet, you can get started by "
'running your pipeline with the --create_test flag enabled.')
- with _fix_xlang_instant_coding():
- tests = [
- yaml_testing.YamlTestCase(
- pipeline_spec, test_spec, options, known_args.fix_tests)
- for test_spec in test_specs
- ]
- suite = unittest.TestSuite(tests)
- result = unittest.TextTestRunner().run(suite)
+ tests = [
+ yaml_testing.YamlTestCase(
+ pipeline_spec, test_spec, options, known_args.fix_tests)
+ for test_spec in test_specs
+ ]
+ suite = unittest.TestSuite(tests)
+ result = unittest.TextTestRunner().run(suite)
if known_args.fix_tests or known_args.create_test:
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)