This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a0e9775ac25 Support legacy DATE and TIME logical types in xlang JdbcIO
(#28382)
a0e9775ac25 is described below
commit a0e9775ac25a5676714ba743cbd7db02b0512af4
Author: Yi Hu <[email protected]>
AuthorDate: Mon Sep 18 10:58:30 2023 -0400
Support legacy DATE and TIME logical types in xlang JdbcIO (#28382)
* Support legacy DATE and TIME logical types in xlang JdbcIO
* Add identifier to URN for legacy Java logical types
* Implement JdbcIO logical type javasdk_date and javasdk_time in Python
---
.../apache/beam/sdk/schemas/SchemaTranslation.java | 74 +++++++++------
.../beam/sdk/schemas/SchemaTranslationTest.java | 69 ++++++++++++--
.../org/apache/beam/sdk/io/jdbc/LogicalTypes.java | 70 +--------------
.../io/external/xlang_jdbcio_it_test.py | 16 +++-
sdks/python/apache_beam/io/jdbc.py | 100 +++++++++++++++++++++
5 files changed, 223 insertions(+), 106 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
index fb6746b9cdd..c0683ef4461 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -74,7 +75,23 @@ public class SchemaTranslation {
private static final Logger LOG =
LoggerFactory.getLogger(SchemaTranslation.class);
private static final String URN_BEAM_LOGICAL_DECIMAL =
FixedPrecisionNumeric.BASE_IDENTIFIER;
- private static final String URN_BEAM_LOGICAL_JAVASDK =
"beam:logical_type:javasdk:v1";
+
+ private static String getLogicalTypeUrn(String identifier) {
+ if (identifier.startsWith("beam:logical_type:")) {
+ return identifier;
+ } else {
+ String filtered = identifier.replaceAll("[^0-9A-Za-z_]",
"").toLowerCase();
+ if (!Strings.isNullOrEmpty(filtered)) {
+ // urn for non-standard Java SDK logical types are assigned with
javasdk_<identifier>
+ return String.format("beam:logical_type:javasdk_%s:v1", filtered);
+ } else {
+ // raw "javasdk" name should only be a last resort. Types defined in
Beam should have their
+ // own URN.
+ return "beam:logical_type:javasdk:v1";
+ }
+ }
+ }
+
private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT =
SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT
.getValueDescriptor()
@@ -84,18 +101,18 @@ public class SchemaTranslation {
// TODO(https://github.com/apache/beam/issues/19715): Populate this with a
LogicalTypeRegistrar,
// which includes a way to construct
// the LogicalType given an argument.
- private static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>>
- STANDARD_LOGICAL_TYPES =
- ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
- .put(FixedPrecisionNumeric.IDENTIFIER,
FixedPrecisionNumeric.class)
- .put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
- .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
- .put(PythonCallable.IDENTIFIER, PythonCallable.class)
- .put(FixedBytes.IDENTIFIER, FixedBytes.class)
- .put(VariableBytes.IDENTIFIER, VariableBytes.class)
- .put(FixedString.IDENTIFIER, FixedString.class)
- .put(VariableString.IDENTIFIER, VariableString.class)
- .build();
+ @VisibleForTesting
+ static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>>
STANDARD_LOGICAL_TYPES =
+ ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
+ .put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class)
+ .put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
+ .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
+ .put(PythonCallable.IDENTIFIER, PythonCallable.class)
+ .put(FixedBytes.IDENTIFIER, FixedBytes.class)
+ .put(VariableBytes.IDENTIFIER, VariableBytes.class)
+ .put(FixedString.IDENTIFIER, FixedString.class)
+ .put(VariableString.IDENTIFIER, VariableString.class)
+ .build();
public static SchemaApi.Schema schemaToProto(Schema schema, boolean
serializeLogicalType) {
String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
@@ -179,11 +196,7 @@ public class SchemaTranslation {
fieldValueToProto(logicalType.getArgumentType(),
logicalType.getArgument()));
}
} else {
- // TODO(https://github.com/apache/beam/issues/19715): "javasdk"
types should only
- // be a last resort. Types defined in Beam should have their own
URN, and there
- // should be a mechanism for users to register their own types by
URN.
- String urn =
- identifier.startsWith("beam:logical_type:") ? identifier :
URN_BEAM_LOGICAL_JAVASDK;
+ String urn = getLogicalTypeUrn(identifier);
logicalTypeBuilder =
SchemaApi.LogicalType.newBuilder()
.setRepresentation(
@@ -429,15 +442,22 @@ public class SchemaTranslation {
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
return FieldType.DECIMAL;
} else if (urn.startsWith("beam:logical_type:")) {
- try {
- return FieldType.logicalType(
- (LogicalType)
- SerializableUtils.deserializeFromByteArray(
- logicalType.getPayload().toByteArray(),
"logicalType"));
- } catch (IllegalArgumentException e) {
- LOG.warn(
- "Unable to deserialize the logical type {} from proto. Mark as
UnknownLogicalType.",
- urn);
+ if (!logicalType.getPayload().isEmpty()) {
+ // logical type has a payload, try to recover the instance by
deserialization
+ try {
+ return FieldType.logicalType(
+ (LogicalType)
+ SerializableUtils.deserializeFromByteArray(
+ logicalType.getPayload().toByteArray(),
"logicalType"));
+ } catch (IllegalArgumentException e) {
+ LOG.warn(
+ "Unable to deserialize the logical type {} from proto. Mark
as UnknownLogicalType.",
+ urn);
+ }
+ } else {
+ // logical type does not have a payload. This happens when it is
passed xlang.
+ // TODO(yathu) it appears this path is called heavily, consider
cache the instance
+ LOG.debug("Constructing non-standard logical type {} as
UnknownLogicalType", urn);
}
}
// assemble an UnknownLogicalType
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
index bdce452192a..3020d7e42d0 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;
+import static
org.apache.beam.sdk.schemas.SchemaTranslation.STANDARD_LOGICAL_TYPES;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.values.Row;
@@ -186,7 +188,8 @@ public class SchemaTranslationTest {
.withOptions(optionsBuilder))
.add(
Schema.of(
- Field.of("null_argument", FieldType.logicalType(new
NullArgumentLogicalType()))))
+ Field.of(
+ "null_argument", FieldType.logicalType(new
PortableNullArgLogicalType()))))
.add(Schema.of(Field.of("logical_argument",
FieldType.logicalType(new DateTime()))))
.add(
Schema.of(Field.of("single_arg_argument",
FieldType.logicalType(FixedBytes.of(100)))))
@@ -348,14 +351,14 @@ public class SchemaTranslationTest {
.add(simpleRow(FieldType.row(row.getSchema()), row))
.add(simpleRow(FieldType.DATETIME, new Instant(23L)))
.add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000)))
- .add(simpleRow(FieldType.logicalType(new NullArgumentLogicalType()),
"str"))
+ .add(simpleRow(FieldType.logicalType(new
PortableNullArgLogicalType()), "str"))
.add(simpleRow(FieldType.logicalType(new DateTime()),
LocalDateTime.of(2000, 1, 3, 3, 1)))
.add(simpleNullRow(FieldType.STRING))
.add(simpleNullRow(FieldType.INT32))
.add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32)))
.add(simpleNullRow(FieldType.array(FieldType.STRING)))
.add(simpleNullRow(FieldType.row(row.getSchema())))
- .add(simpleNullRow(FieldType.logicalType(new
NullArgumentLogicalType())))
+ .add(simpleNullRow(FieldType.logicalType(new
PortableNullArgLogicalType())))
.add(simpleNullRow(FieldType.logicalType(new DateTime())))
.add(simpleNullRow(FieldType.DECIMAL))
.add(simpleNullRow(FieldType.DATETIME))
@@ -419,6 +422,8 @@ public class SchemaTranslationTest {
.add(FieldType.logicalType(FixedString.of(10)))
.add(FieldType.logicalType(VariableString.of(10)))
.add(FieldType.logicalType(FixedPrecisionNumeric.of(10)))
+ .add(FieldType.logicalType(new PortableNullArgLogicalType()))
+ .add(FieldType.logicalType(new NullArgumentLogicalType()))
.build();
}
@@ -426,7 +431,7 @@ public class SchemaTranslationTest {
public Schema.FieldType fieldType;
@Test
- public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
+ public void testLogicalTypeSerializeDeserilizeCorrectly() {
SchemaApi.FieldType proto =
SchemaTranslation.fieldTypeToProto(fieldType, true);
Schema.FieldType translated =
SchemaTranslation.fieldTypeFromProto(proto);
@@ -438,14 +443,64 @@ public class SchemaTranslationTest {
assertThat(
translated.getLogicalType().getArgument(),
equalTo(fieldType.getLogicalType().getArgument()));
+ assertThat(
+ translated.getLogicalType().getIdentifier(),
+ equalTo(fieldType.getLogicalType().getIdentifier()));
+ }
+
+ @Test
+ public void testLogicalTypeFromToProtoCorrectly() {
+ SchemaApi.FieldType proto =
SchemaTranslation.fieldTypeToProto(fieldType, false);
+ Schema.FieldType translated =
SchemaTranslation.fieldTypeFromProto(proto);
+
+ if
(STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier()))
{
+ // standard logical type should be able to fully recover the original
type
+ assertThat(
+ translated.getLogicalType().getClass(),
equalTo(fieldType.getLogicalType().getClass()));
+ } else {
+ // non-standard type will get assembled to UnknownLogicalType
+ assertThat(translated.getLogicalType().getClass(),
equalTo(UnknownLogicalType.class));
+ }
+ assertThat(
+ translated.getLogicalType().getArgumentType(),
+ equalTo(fieldType.getLogicalType().getArgumentType()));
+ assertThat(
+ translated.getLogicalType().getArgument(),
+ equalTo(fieldType.getLogicalType().getArgument()));
+ if
(fieldType.getLogicalType().getIdentifier().startsWith("beam:logical_type:")) {
+ // portable logical type should fully recover the urn
+ assertThat(
+ translated.getLogicalType().getIdentifier(),
+ equalTo(fieldType.getLogicalType().getIdentifier()));
+ } else {
+ // non-portable logical type would have "javasdk_<IDENTIFIER>" urn
+ assertThat(
+ translated.getLogicalType().getIdentifier(),
+ equalTo(
+ String.format(
+ "beam:logical_type:javasdk_%s:v1",
+ fieldType
+ .getLogicalType()
+ .getIdentifier()
+ .toLowerCase()
+ .replaceAll("[^0-9A-Za-z_]", ""))));
+ }
}
}
- /** A simple logical type that has no argument. */
- private static class NullArgumentLogicalType implements
Schema.LogicalType<String, String> {
+ /** A portable logical type that has no argument. */
+ private static class PortableNullArgLogicalType extends
NullArgumentLogicalType {
public static final String IDENTIFIER =
"beam:logical_type:null_argument:v1";
- public NullArgumentLogicalType() {}
+ @Override
+ public String getIdentifier() {
+ return IDENTIFIER;
+ }
+ }
+
+ /** A non-portable (Java SDK) logical type that has no argument. */
+ private static class NullArgumentLogicalType implements
Schema.LogicalType<String, String> {
+ public static final String IDENTIFIER = "NULL_ARGUMENT";
@Override
public String toBaseType(String input) {
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
index 674b274f907..6e8e46b7afa 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.jdbc;
import java.sql.JDBCType;
import java.time.Instant;
-import java.util.Objects;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
@@ -30,11 +29,11 @@ import
org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.checkerframework.checker.nullness.qual.NonNull;
-import org.checkerframework.checker.nullness.qual.Nullable;
/** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType}
implementations of JDBC types. */
class LogicalTypes {
+ // Logical types of the following static members are not portable and are
preserved for
+ // compatibility reason. Consider using portable logical types when adding
new ones.
static final Schema.FieldType JDBC_BIT_TYPE =
Schema.FieldType.logicalType(
new PassThroughLogicalType<Boolean>(
@@ -110,69 +109,4 @@ class LogicalTypes {
return FixedBytes.of(name, length);
}
}
-
- /** Base class for JDBC logical types. */
- abstract static class JdbcLogicalType<T extends @NonNull Object>
- implements Schema.LogicalType<T, T> {
- protected final String identifier;
- protected final Schema.FieldType argumentType;
- protected final Schema.FieldType baseType;
- protected final Object argument;
-
- protected JdbcLogicalType(
- String identifier,
- Schema.FieldType argumentType,
- Schema.FieldType baseType,
- Object argument) {
- this.identifier = identifier;
- this.argumentType = argumentType;
- this.baseType = baseType;
- this.argument = argument;
- }
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public FieldType getArgumentType() {
- return argumentType;
- }
-
- @Override
- @SuppressWarnings("TypeParameterUnusedInFormals")
- public <ArgumentT> ArgumentT getArgument() {
- return (ArgumentT) argument;
- }
-
- @Override
- public Schema.FieldType getBaseType() {
- return baseType;
- }
-
- @Override
- public T toBaseType(T input) {
- return input;
- }
-
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof JdbcLogicalType)) {
- return false;
- }
- JdbcLogicalType<?> that = (JdbcLogicalType<?>) o;
- return Objects.equals(identifier, that.identifier)
- && Objects.equals(baseType, that.baseType)
- && Objects.equals(argument, that.argument);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(identifier, baseType, argument);
- }
- }
}
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index ed8745ec2ac..54a473d1b52 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -17,6 +17,7 @@
# pytype: skip-file
+import datetime
import logging
import time
import typing
@@ -60,7 +61,8 @@ JdbcTestRow = typing.NamedTuple(
"JdbcTestRow",
[("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
- ("f_decimal", Decimal)],
+ ("f_decimal", Decimal), ("f_date", datetime.date),
+ ("f_time", datetime.time)],
)
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
@@ -132,7 +134,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
"f_float DOUBLE PRECISION, " + "f_char CHAR(10), " +
"f_varchar VARCHAR(10), " + f"f_bytes {binary_type[0]}, " +
f"f_varbytes {binary_type[1]}, " + "f_timestamp TIMESTAMP(3), " +
- "f_decimal DECIMAL(10, 2))")
+ "f_decimal DECIMAL(10, 2), " + "f_date DATE, " + "f_time TIME(3))")
inserted_rows = [
JdbcTestRow(
i,
@@ -144,7 +146,11 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
# In alignment with Java Instant which supports milli precision.
Timestamp.of(seconds=round(time.time(), 3)),
# Test both positive and negative numbers.
- Decimal(f'{i-1}.23')) for i in range(ROW_COUNT)
+ Decimal(f'{i-1}.23'),
+ # Test both date before or after EPOCH
+ datetime.date(1969 + i, i % 12 + 1, i % 31 + 1),
+ datetime.time(i % 24, i % 60, i % 60, (i * 1000) % 1_000_000))
+ for i in range(ROW_COUNT)
]
expected_row = []
for row in inserted_rows:
@@ -163,7 +169,9 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
f_bytes,
row.f_bytes,
row.f_timestamp,
- row.f_decimal))
+ row.f_decimal,
+ row.f_date,
+ row.f_time))
with TestPipeline() as p:
p.not_use_test_runner_api = True
diff --git a/sdks/python/apache_beam/io/jdbc.py
b/sdks/python/apache_beam/io/jdbc.py
index f8f24ddeb8d..903b0d1b0fe 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -86,6 +86,7 @@
# pytype: skip-file
+import datetime
import typing
import numpy as np
@@ -94,7 +95,10 @@ from apache_beam.coders import RowCoder
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import LogicalType
+from apache_beam.typehints.schemas import MillisInstant
from apache_beam.typehints.schemas import typing_to_runner_api
+from apache_beam.utils.timestamp import Timestamp
__all__ = [
'WriteToJdbc',
@@ -355,3 +359,99 @@ class ReadFromJdbc(ExternalTransform):
),
expansion_service or default_io_expansion_service(classpath),
)
+
+
[email protected]_logical_type
+class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
+ """
+ For internal use only; no backwards-compatibility guarantees.
+
+ Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
+ has been migrated to Beam portable logical types.
+ """
+ def __init__(self, argument=""):
+ pass
+
+ @classmethod
+ def representation_type(cls):
+ # type: () -> type
+ return Timestamp
+
+ @classmethod
+ def urn(cls):
+ return "beam:logical_type:javasdk_date:v1"
+
+ @classmethod
+ def language_type(cls):
+ return datetime.date
+
+ def to_representation_type(self, value):
+ # type: (datetime.date) -> Timestamp
+ return Timestamp.from_utc_datetime(
+ datetime.datetime.combine(
+ value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
+
+ def to_language_type(self, value):
+ # type: (Timestamp) -> datetime.date
+
+ return value.to_utc_datetime().date()
+
+ @classmethod
+ def argument_type(cls):
+ return str
+
+ def argument(self):
+ return ""
+
+ @classmethod
+ def _from_typing(cls, typ):
+ return cls()
+
+
[email protected]_logical_type
+class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
+ """
+ For internal use only; no backwards-compatibility guarantees.
+
+ Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
+ JDBCIO has been migrated to Beam portable logical types.
+ """
+ def __init__(self, argument=""):
+ pass
+
+ @classmethod
+ def representation_type(cls):
+ # type: () -> type
+ return Timestamp
+
+ @classmethod
+ def urn(cls):
+ return "beam:logical_type:javasdk_time:v1"
+
+ @classmethod
+ def language_type(cls):
+ return datetime.time
+
+ def to_representation_type(self, value):
+ # type: (datetime.date) -> Timestamp
+ return Timestamp.from_utc_datetime(
+ datetime.datetime.combine(
+ datetime.datetime.utcfromtimestamp(0),
+ value,
+ tzinfo=datetime.timezone.utc))
+
+ def to_language_type(self, value):
+ # type: (Timestamp) -> datetime.date
+
+ return value.to_utc_datetime().time()
+
+ @classmethod
+ def argument_type(cls):
+ return str
+
+ def argument(self):
+ return ""
+
+ @classmethod
+ def _from_typing(cls, typ):
+ return cls()