This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a0e9775ac25 Support legacy DATE and TIME logical types in xlang JdbcIO (#28382) a0e9775ac25 is described below commit a0e9775ac25a5676714ba743cbd7db02b0512af4 Author: Yi Hu <ya...@google.com> AuthorDate: Mon Sep 18 10:58:30 2023 -0400 Support legacy DATE and TIME logical types in xlang JdbcIO (#28382) * Support legacy DATE and TIME logical types in xlang JdbcIO * Add identifier to URN for legacy Java logical types * Implement JdbcIO logical type javasdk_date and javasdk_time in Python --- .../apache/beam/sdk/schemas/SchemaTranslation.java | 74 +++++++++------ .../beam/sdk/schemas/SchemaTranslationTest.java | 69 ++++++++++++-- .../org/apache/beam/sdk/io/jdbc/LogicalTypes.java | 70 +-------------- .../io/external/xlang_jdbcio_it_test.py | 16 +++- sdks/python/apache_beam/io/jdbc.py | 100 +++++++++++++++++++++ 5 files changed, 223 insertions(+), 106 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java index fb6746b9cdd..c0683ef4461 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -74,7 +75,23 @@ public class SchemaTranslation { private static final Logger LOG = LoggerFactory.getLogger(SchemaTranslation.class); private static final String URN_BEAM_LOGICAL_DECIMAL = FixedPrecisionNumeric.BASE_IDENTIFIER; - private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1"; + + private static String getLogicalTypeUrn(String identifier) { + if (identifier.startsWith("beam:logical_type:")) { + return identifier; + } else { + String filtered = identifier.replaceAll("[^0-9A-Za-z_]", "").toLowerCase(); + if (!Strings.isNullOrEmpty(filtered)) { + // urn for non-standard Java SDK logical types are assigned with javasdk_<identifier> + return String.format("beam:logical_type:javasdk_%s:v1", filtered); + } else { + // raw "javasdk" name should only be a last resort. Types defined in Beam should have their + // own URN. + return "beam:logical_type:javasdk:v1"; + } + } + } + private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT = SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT .getValueDescriptor() @@ -84,18 +101,18 @@ public class SchemaTranslation { // TODO(https://github.com/apache/beam/issues/19715): Populate this with a LogicalTypeRegistrar, // which includes a way to construct // the LogicalType given an argument. - private static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>> - STANDARD_LOGICAL_TYPES = - ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder() - .put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class) - .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) - .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) - .put(PythonCallable.IDENTIFIER, PythonCallable.class) - .put(FixedBytes.IDENTIFIER, FixedBytes.class) - .put(VariableBytes.IDENTIFIER, VariableBytes.class) - .put(FixedString.IDENTIFIER, FixedString.class) - .put(VariableString.IDENTIFIER, VariableString.class) - .build(); + @VisibleForTesting + static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>> STANDARD_LOGICAL_TYPES = + ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder() + .put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class) + .put(MicrosInstant.IDENTIFIER, MicrosInstant.class) + .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class) + .put(PythonCallable.IDENTIFIER, PythonCallable.class) + .put(FixedBytes.IDENTIFIER, FixedBytes.class) + .put(VariableBytes.IDENTIFIER, VariableBytes.class) + .put(FixedString.IDENTIFIER, FixedString.class) + .put(VariableString.IDENTIFIER, VariableString.class) + .build(); public static SchemaApi.Schema schemaToProto(Schema schema, boolean serializeLogicalType) { String uuid = schema.getUUID() != null ? schema.getUUID().toString() : ""; @@ -179,11 +196,7 @@ public class SchemaTranslation { fieldValueToProto(logicalType.getArgumentType(), logicalType.getArgument())); } } else { - // TODO(https://github.com/apache/beam/issues/19715): "javasdk" types should only - // be a last resort. Types defined in Beam should have their own URN, and there - // should be a mechanism for users to register their own types by URN. - String urn = - identifier.startsWith("beam:logical_type:") ? identifier : URN_BEAM_LOGICAL_JAVASDK; + String urn = getLogicalTypeUrn(identifier); logicalTypeBuilder = SchemaApi.LogicalType.newBuilder() .setRepresentation( @@ -429,15 +442,22 @@ public class SchemaTranslation { } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) { return FieldType.DECIMAL; } else if (urn.startsWith("beam:logical_type:")) { - try { - return FieldType.logicalType( - (LogicalType) - SerializableUtils.deserializeFromByteArray( - logicalType.getPayload().toByteArray(), "logicalType")); - } catch (IllegalArgumentException e) { - LOG.warn( - "Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.", - urn); + if (!logicalType.getPayload().isEmpty()) { + // logical type has a payload, try to recover the instance by deserialization + try { + return FieldType.logicalType( + (LogicalType) + SerializableUtils.deserializeFromByteArray( + logicalType.getPayload().toByteArray(), "logicalType")); + } catch (IllegalArgumentException e) { + LOG.warn( + "Unable to deserialize the logical type {} from proto. Mark as UnknownLogicalType.", + urn); + } + } else { + // logical type does not have a payload. This happens when it is passed xlang. + // TODO(yathu) it appears this path is called heavily, consider cache the instance + LOG.debug("Constructing non-standard logical type {} as UnknownLogicalType", urn); } } // assemble an UnknownLogicalType diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java index bdce452192a..3020d7e42d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.schemas; +import static org.apache.beam.sdk.schemas.SchemaTranslation.STANDARD_LOGICAL_TYPES; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -48,6 +49,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable; import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.values.Row; @@ -186,7 +188,8 @@ public class SchemaTranslationTest { .withOptions(optionsBuilder)) .add( Schema.of( - Field.of("null_argument", FieldType.logicalType(new NullArgumentLogicalType())))) + Field.of( + "null_argument", FieldType.logicalType(new PortableNullArgLogicalType())))) .add(Schema.of(Field.of("logical_argument", FieldType.logicalType(new DateTime())))) .add( Schema.of(Field.of("single_arg_argument", FieldType.logicalType(FixedBytes.of(100))))) @@ -348,14 +351,14 @@ public class SchemaTranslationTest { .add(simpleRow(FieldType.row(row.getSchema()), row)) .add(simpleRow(FieldType.DATETIME, new Instant(23L))) .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000))) - .add(simpleRow(FieldType.logicalType(new NullArgumentLogicalType()), "str")) + .add(simpleRow(FieldType.logicalType(new PortableNullArgLogicalType()), "str")) .add(simpleRow(FieldType.logicalType(new DateTime()), LocalDateTime.of(2000, 1, 3, 3, 1))) .add(simpleNullRow(FieldType.STRING)) .add(simpleNullRow(FieldType.INT32)) .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32))) .add(simpleNullRow(FieldType.array(FieldType.STRING))) .add(simpleNullRow(FieldType.row(row.getSchema()))) - .add(simpleNullRow(FieldType.logicalType(new NullArgumentLogicalType()))) + .add(simpleNullRow(FieldType.logicalType(new PortableNullArgLogicalType()))) .add(simpleNullRow(FieldType.logicalType(new DateTime()))) .add(simpleNullRow(FieldType.DECIMAL)) .add(simpleNullRow(FieldType.DATETIME)) @@ -419,6 +422,8 @@ public class SchemaTranslationTest { .add(FieldType.logicalType(FixedString.of(10))) .add(FieldType.logicalType(VariableString.of(10))) .add(FieldType.logicalType(FixedPrecisionNumeric.of(10))) + .add(FieldType.logicalType(new PortableNullArgLogicalType())) + .add(FieldType.logicalType(new NullArgumentLogicalType())) .build(); } @@ -426,7 +431,7 @@ public class SchemaTranslationTest { public Schema.FieldType fieldType; @Test - public void testPortableLogicalTypeSerializeDeserilizeCorrectly() { + public void testLogicalTypeSerializeDeserilizeCorrectly() { SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, true); Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); @@ -438,14 +443,64 @@ public class SchemaTranslationTest { assertThat( translated.getLogicalType().getArgument(), equalTo(fieldType.getLogicalType().getArgument())); + assertThat( + translated.getLogicalType().getIdentifier(), + equalTo(fieldType.getLogicalType().getIdentifier())); + } + + @Test + public void testLogicalTypeFromToProtoCorrectly() { + SchemaApi.FieldType proto = SchemaTranslation.fieldTypeToProto(fieldType, false); + Schema.FieldType translated = SchemaTranslation.fieldTypeFromProto(proto); + + if (STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier())) { + // standard logical type should be able to fully recover the original type + assertThat( + translated.getLogicalType().getClass(), equalTo(fieldType.getLogicalType().getClass())); + } else { + // non-standard type will get assembled to UnknownLogicalType + assertThat(translated.getLogicalType().getClass(), equalTo(UnknownLogicalType.class)); + } + assertThat( + translated.getLogicalType().getArgumentType(), + equalTo(fieldType.getLogicalType().getArgumentType())); + assertThat( + translated.getLogicalType().getArgument(), + equalTo(fieldType.getLogicalType().getArgument())); + if (fieldType.getLogicalType().getIdentifier().startsWith("beam:logical_type:")) { + // portable logical type should fully recover the urn + assertThat( + translated.getLogicalType().getIdentifier(), + equalTo(fieldType.getLogicalType().getIdentifier())); + } else { + // non-portable logical type would have "javasdk_<IDENTIFIER>" urn + assertThat( + translated.getLogicalType().getIdentifier(), + equalTo( + String.format( + "beam:logical_type:javasdk_%s:v1", + fieldType + .getLogicalType() + .getIdentifier() + .toLowerCase() + .replaceAll("[^0-9A-Za-z_]", "")))); + } } } - /** A simple logical type that has no argument. */ - private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> { + /** A portable logical type that has no argument. */ + private static class PortableNullArgLogicalType extends NullArgumentLogicalType { public static final String IDENTIFIER = "beam:logical_type:null_argument:v1"; - public NullArgumentLogicalType() {} + @Override + public String getIdentifier() { + return IDENTIFIER; + } + } + + /** A non-portable (Java SDK) logical type that has no argument. */ + private static class NullArgumentLogicalType implements Schema.LogicalType<String, String> { + public static final String IDENTIFIER = "NULL_ARGUMENT"; @Override public String toBaseType(String input) { diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java index 674b274f907..6e8e46b7afa 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.jdbc; import java.sql.JDBCType; import java.time.Instant; -import java.util.Objects; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; @@ -30,11 +29,11 @@ import org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; /** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType} implementations of JDBC types. */ class LogicalTypes { + // Logical types of the following static members are not portable and are preserved for + // compatibility reason. Consider using portable logical types when adding new ones. static final Schema.FieldType JDBC_BIT_TYPE = Schema.FieldType.logicalType( new PassThroughLogicalType<Boolean>( @@ -110,69 +109,4 @@ class LogicalTypes { return FixedBytes.of(name, length); } } - - /** Base class for JDBC logical types. */ - abstract static class JdbcLogicalType<T extends @NonNull Object> - implements Schema.LogicalType<T, T> { - protected final String identifier; - protected final Schema.FieldType argumentType; - protected final Schema.FieldType baseType; - protected final Object argument; - - protected JdbcLogicalType( - String identifier, - Schema.FieldType argumentType, - Schema.FieldType baseType, - Object argument) { - this.identifier = identifier; - this.argumentType = argumentType; - this.baseType = baseType; - this.argument = argument; - } - - @Override - public String getIdentifier() { - return identifier; - } - - @Override - public FieldType getArgumentType() { - return argumentType; - } - - @Override - @SuppressWarnings("TypeParameterUnusedInFormals") - public <ArgumentT> ArgumentT getArgument() { - return (ArgumentT) argument; - } - - @Override - public Schema.FieldType getBaseType() { - return baseType; - } - - @Override - public T toBaseType(T input) { - return input; - } - - @Override - public boolean equals(@Nullable Object o) { - if (this == o) { - return true; - } - if (!(o instanceof JdbcLogicalType)) { - return false; - } - JdbcLogicalType<?> that = (JdbcLogicalType<?>) o; - return Objects.equals(identifier, that.identifier) - && Objects.equals(baseType, that.baseType) - && Objects.equals(argument, that.argument); - } - - @Override - public int hashCode() { - return Objects.hash(identifier, baseType, argument); - } - } } diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py index ed8745ec2ac..54a473d1b52 100644 --- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py @@ -17,6 +17,7 @@ # pytype: skip-file +import datetime import logging import time import typing @@ -60,7 +61,8 @@ JdbcTestRow = typing.NamedTuple( "JdbcTestRow", [("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str), ("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp), - ("f_decimal", Decimal)], + ("f_decimal", Decimal), ("f_date", datetime.date), + ("f_time", datetime.time)], ) coders.registry.register_coder(JdbcTestRow, coders.RowCoder) @@ -132,7 +134,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase): "f_float DOUBLE PRECISION, " + "f_char CHAR(10), " + "f_varchar VARCHAR(10), " + f"f_bytes {binary_type[0]}, " + f"f_varbytes {binary_type[1]}, " + "f_timestamp TIMESTAMP(3), " + - "f_decimal DECIMAL(10, 2))") + "f_decimal DECIMAL(10, 2), " + "f_date DATE, " + "f_time TIME(3))") inserted_rows = [ JdbcTestRow( i, @@ -144,7 +146,11 @@ class CrossLanguageJdbcIOTest(unittest.TestCase): # In alignment with Java Instant which supports milli precision. Timestamp.of(seconds=round(time.time(), 3)), # Test both positive and negative numbers. - Decimal(f'{i-1}.23')) for i in range(ROW_COUNT) + Decimal(f'{i-1}.23'), + # Test both date before or after EPOCH + datetime.date(1969 + i, i % 12 + 1, i % 31 + 1), + datetime.time(i % 24, i % 60, i % 60, (i * 1000) % 1_000_000)) + for i in range(ROW_COUNT) ] expected_row = [] for row in inserted_rows: @@ -163,7 +169,9 @@ class CrossLanguageJdbcIOTest(unittest.TestCase): f_bytes, row.f_bytes, row.f_timestamp, - row.f_decimal)) + row.f_decimal, + row.f_date, + row.f_time)) with TestPipeline() as p: p.not_use_test_runner_api = True diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py index f8f24ddeb8d..903b0d1b0fe 100644 --- a/sdks/python/apache_beam/io/jdbc.py +++ b/sdks/python/apache_beam/io/jdbc.py @@ -86,6 +86,7 @@ # pytype: skip-file +import datetime import typing import numpy as np @@ -94,7 +95,10 @@ from apache_beam.coders import RowCoder from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder +from apache_beam.typehints.schemas import LogicalType +from apache_beam.typehints.schemas import MillisInstant from apache_beam.typehints.schemas import typing_to_runner_api +from apache_beam.utils.timestamp import Timestamp __all__ = [ 'WriteToJdbc', @@ -355,3 +359,99 @@ class ReadFromJdbc(ExternalTransform): ), expansion_service or default_io_expansion_service(classpath), ) + + +@LogicalType.register_logical_type +class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO + has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls): + # type: () -> type + return Timestamp + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_date:v1" + + @classmethod + def language_type(cls): + return datetime.date + + def to_representation_type(self, value): + # type: (datetime.date) -> Timestamp + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value): + # type: (Timestamp) -> datetime.date + + return value.to_utc_datetime().date() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() + + +@LogicalType.register_logical_type +class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java + JDBCIO has been migrated to Beam portable logical types. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls): + # type: () -> type + return Timestamp + + @classmethod + def urn(cls): + return "beam:logical_type:javasdk_time:v1" + + @classmethod + def language_type(cls): + return datetime.time + + def to_representation_type(self, value): + # type: (datetime.date) -> Timestamp + return Timestamp.from_utc_datetime( + datetime.datetime.combine( + datetime.datetime.utcfromtimestamp(0), + value, + tzinfo=datetime.timezone.utc)) + + def to_language_type(self, value): + # type: (Timestamp) -> datetime.date + + return value.to_utc_datetime().time() + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls()