This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e9775ac25 Support legacy DATE and TIME logical types in xlang JdbcIO 
(#28382)
a0e9775ac25 is described below

commit a0e9775ac25a5676714ba743cbd7db02b0512af4
Author: Yi Hu <ya...@google.com>
AuthorDate: Mon Sep 18 10:58:30 2023 -0400

    Support legacy DATE and TIME logical types in xlang JdbcIO (#28382)
    
    * Support legacy DATE and TIME logical types in xlang JdbcIO
    
    * Add identifier to  URN for legacy Java logical types
    
    * Implement JdbcIO logical type javasdk_date and javasdk_time in Python
---
 .../apache/beam/sdk/schemas/SchemaTranslation.java |  74 +++++++++------
 .../beam/sdk/schemas/SchemaTranslationTest.java    |  69 ++++++++++++--
 .../org/apache/beam/sdk/io/jdbc/LogicalTypes.java  |  70 +--------------
 .../io/external/xlang_jdbcio_it_test.py            |  16 +++-
 sdks/python/apache_beam/io/jdbc.py                 | 100 +++++++++++++++++++++
 5 files changed, 223 insertions(+), 106 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
index fb6746b9cdd..c0683ef4461 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -74,7 +75,23 @@ public class SchemaTranslation {
   private static final Logger LOG = 
LoggerFactory.getLogger(SchemaTranslation.class);
 
   private static final String URN_BEAM_LOGICAL_DECIMAL = 
FixedPrecisionNumeric.BASE_IDENTIFIER;
-  private static final String URN_BEAM_LOGICAL_JAVASDK = 
"beam:logical_type:javasdk:v1";
+
+  private static String getLogicalTypeUrn(String identifier) {
+    if (identifier.startsWith("beam:logical_type:")) {
+      return identifier;
+    } else {
+      String filtered = identifier.replaceAll("[^0-9A-Za-z_]", 
"").toLowerCase();
+      if (!Strings.isNullOrEmpty(filtered)) {
+        // urn for non-standard Java SDK logical types are assigned with 
javasdk_<identifier>
+        return String.format("beam:logical_type:javasdk_%s:v1", filtered);
+      } else {
+        // raw "javasdk" name should only be a last resort. Types defined in 
Beam should have their
+        // own URN.
+        return "beam:logical_type:javasdk:v1";
+      }
+    }
+  }
+
   private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT =
       SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT
           .getValueDescriptor()
@@ -84,18 +101,18 @@ public class SchemaTranslation {
   // TODO(https://github.com/apache/beam/issues/19715): Populate this with a 
LogicalTypeRegistrar,
   // which includes a way to construct
   // the LogicalType given an argument.
-  private static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>>
-      STANDARD_LOGICAL_TYPES =
-          ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
-              .put(FixedPrecisionNumeric.IDENTIFIER, 
FixedPrecisionNumeric.class)
-              .put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
-              .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
-              .put(PythonCallable.IDENTIFIER, PythonCallable.class)
-              .put(FixedBytes.IDENTIFIER, FixedBytes.class)
-              .put(VariableBytes.IDENTIFIER, VariableBytes.class)
-              .put(FixedString.IDENTIFIER, FixedString.class)
-              .put(VariableString.IDENTIFIER, VariableString.class)
-              .build();
+  @VisibleForTesting
+  static final ImmutableMap<String, Class<? extends LogicalType<?, ?>>> 
STANDARD_LOGICAL_TYPES =
+      ImmutableMap.<String, Class<? extends LogicalType<?, ?>>>builder()
+          .put(FixedPrecisionNumeric.IDENTIFIER, FixedPrecisionNumeric.class)
+          .put(MicrosInstant.IDENTIFIER, MicrosInstant.class)
+          .put(SchemaLogicalType.IDENTIFIER, SchemaLogicalType.class)
+          .put(PythonCallable.IDENTIFIER, PythonCallable.class)
+          .put(FixedBytes.IDENTIFIER, FixedBytes.class)
+          .put(VariableBytes.IDENTIFIER, VariableBytes.class)
+          .put(FixedString.IDENTIFIER, FixedString.class)
+          .put(VariableString.IDENTIFIER, VariableString.class)
+          .build();
 
   public static SchemaApi.Schema schemaToProto(Schema schema, boolean 
serializeLogicalType) {
     String uuid = schema.getUUID() != null ? schema.getUUID().toString() : "";
@@ -179,11 +196,7 @@ public class SchemaTranslation {
                     fieldValueToProto(logicalType.getArgumentType(), 
logicalType.getArgument()));
           }
         } else {
-          // TODO(https://github.com/apache/beam/issues/19715): "javasdk" 
types should only
-          // be a last resort. Types defined in Beam should have their own 
URN, and there
-          // should be a mechanism for users to register their own types by 
URN.
-          String urn =
-              identifier.startsWith("beam:logical_type:") ? identifier : 
URN_BEAM_LOGICAL_JAVASDK;
+          String urn = getLogicalTypeUrn(identifier);
           logicalTypeBuilder =
               SchemaApi.LogicalType.newBuilder()
                   .setRepresentation(
@@ -429,15 +442,22 @@ public class SchemaTranslation {
         } else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
           return FieldType.DECIMAL;
         } else if (urn.startsWith("beam:logical_type:")) {
-          try {
-            return FieldType.logicalType(
-                (LogicalType)
-                    SerializableUtils.deserializeFromByteArray(
-                        logicalType.getPayload().toByteArray(), 
"logicalType"));
-          } catch (IllegalArgumentException e) {
-            LOG.warn(
-                "Unable to deserialize the logical type {} from proto. Mark as 
UnknownLogicalType.",
-                urn);
+          if (!logicalType.getPayload().isEmpty()) {
+            // logical type has a payload, try to recover the instance by 
deserialization
+            try {
+              return FieldType.logicalType(
+                  (LogicalType)
+                      SerializableUtils.deserializeFromByteArray(
+                          logicalType.getPayload().toByteArray(), 
"logicalType"));
+            } catch (IllegalArgumentException e) {
+              LOG.warn(
+                  "Unable to deserialize the logical type {} from proto. Mark 
as UnknownLogicalType.",
+                  urn);
+            }
+          } else {
+            // logical type does not have a payload. This happens when it is 
passed xlang.
+            // TODO(yathu) it appears this path is called heavily, consider 
cache the instance
+            LOG.debug("Constructing non-standard logical type {} as 
UnknownLogicalType", urn);
           }
         }
         // assemble an UnknownLogicalType
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
index bdce452192a..3020d7e42d0 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import static 
org.apache.beam.sdk.schemas.SchemaTranslation.STANDARD_LOGICAL_TYPES;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -48,6 +49,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
 import org.apache.beam.sdk.schemas.logicaltypes.PythonCallable;
 import org.apache.beam.sdk.schemas.logicaltypes.SchemaLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.UnknownLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import org.apache.beam.sdk.values.Row;
@@ -186,7 +188,8 @@ public class SchemaTranslationTest {
                   .withOptions(optionsBuilder))
           .add(
               Schema.of(
-                  Field.of("null_argument", FieldType.logicalType(new 
NullArgumentLogicalType()))))
+                  Field.of(
+                      "null_argument", FieldType.logicalType(new 
PortableNullArgLogicalType()))))
           .add(Schema.of(Field.of("logical_argument", 
FieldType.logicalType(new DateTime()))))
           .add(
               Schema.of(Field.of("single_arg_argument", 
FieldType.logicalType(FixedBytes.of(100)))))
@@ -348,14 +351,14 @@ public class SchemaTranslationTest {
           .add(simpleRow(FieldType.row(row.getSchema()), row))
           .add(simpleRow(FieldType.DATETIME, new Instant(23L)))
           .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000)))
-          .add(simpleRow(FieldType.logicalType(new NullArgumentLogicalType()), 
"str"))
+          .add(simpleRow(FieldType.logicalType(new 
PortableNullArgLogicalType()), "str"))
           .add(simpleRow(FieldType.logicalType(new DateTime()), 
LocalDateTime.of(2000, 1, 3, 3, 1)))
           .add(simpleNullRow(FieldType.STRING))
           .add(simpleNullRow(FieldType.INT32))
           .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32)))
           .add(simpleNullRow(FieldType.array(FieldType.STRING)))
           .add(simpleNullRow(FieldType.row(row.getSchema())))
-          .add(simpleNullRow(FieldType.logicalType(new 
NullArgumentLogicalType())))
+          .add(simpleNullRow(FieldType.logicalType(new 
PortableNullArgLogicalType())))
           .add(simpleNullRow(FieldType.logicalType(new DateTime())))
           .add(simpleNullRow(FieldType.DECIMAL))
           .add(simpleNullRow(FieldType.DATETIME))
@@ -419,6 +422,8 @@ public class SchemaTranslationTest {
           .add(FieldType.logicalType(FixedString.of(10)))
           .add(FieldType.logicalType(VariableString.of(10)))
           .add(FieldType.logicalType(FixedPrecisionNumeric.of(10)))
+          .add(FieldType.logicalType(new PortableNullArgLogicalType()))
+          .add(FieldType.logicalType(new NullArgumentLogicalType()))
           .build();
     }
 
@@ -426,7 +431,7 @@ public class SchemaTranslationTest {
     public Schema.FieldType fieldType;
 
     @Test
-    public void testPortableLogicalTypeSerializeDeserilizeCorrectly() {
+    public void testLogicalTypeSerializeDeserilizeCorrectly() {
       SchemaApi.FieldType proto = 
SchemaTranslation.fieldTypeToProto(fieldType, true);
       Schema.FieldType translated = 
SchemaTranslation.fieldTypeFromProto(proto);
 
@@ -438,14 +443,64 @@ public class SchemaTranslationTest {
       assertThat(
           translated.getLogicalType().getArgument(),
           equalTo(fieldType.getLogicalType().getArgument()));
+      assertThat(
+          translated.getLogicalType().getIdentifier(),
+          equalTo(fieldType.getLogicalType().getIdentifier()));
+    }
+
+    @Test
+    public void testLogicalTypeFromToProtoCorrectly() {
+      SchemaApi.FieldType proto = 
SchemaTranslation.fieldTypeToProto(fieldType, false);
+      Schema.FieldType translated = 
SchemaTranslation.fieldTypeFromProto(proto);
+
+      if 
(STANDARD_LOGICAL_TYPES.containsKey(translated.getLogicalType().getIdentifier()))
 {
+        // standard logical type should be able to fully recover the original 
type
+        assertThat(
+            translated.getLogicalType().getClass(), 
equalTo(fieldType.getLogicalType().getClass()));
+      } else {
+        // non-standard type will get assembled to UnknownLogicalType
+        assertThat(translated.getLogicalType().getClass(), 
equalTo(UnknownLogicalType.class));
+      }
+      assertThat(
+          translated.getLogicalType().getArgumentType(),
+          equalTo(fieldType.getLogicalType().getArgumentType()));
+      assertThat(
+          translated.getLogicalType().getArgument(),
+          equalTo(fieldType.getLogicalType().getArgument()));
+      if 
(fieldType.getLogicalType().getIdentifier().startsWith("beam:logical_type:")) {
+        // portable logical type should fully recover the urn
+        assertThat(
+            translated.getLogicalType().getIdentifier(),
+            equalTo(fieldType.getLogicalType().getIdentifier()));
+      } else {
+        // non-portable logical type would have "javasdk_<IDENTIFIER>" urn
+        assertThat(
+            translated.getLogicalType().getIdentifier(),
+            equalTo(
+                String.format(
+                    "beam:logical_type:javasdk_%s:v1",
+                    fieldType
+                        .getLogicalType()
+                        .getIdentifier()
+                        .toLowerCase()
+                        .replaceAll("[^0-9A-Za-z_]", ""))));
+      }
     }
   }
 
-  /** A simple logical type that has no argument. */
-  private static class NullArgumentLogicalType implements 
Schema.LogicalType<String, String> {
+  /** A portable logical type that has no argument. */
+  private static class PortableNullArgLogicalType extends 
NullArgumentLogicalType {
     public static final String IDENTIFIER = 
"beam:logical_type:null_argument:v1";
 
-    public NullArgumentLogicalType() {}
+    @Override
+    public String getIdentifier() {
+      return IDENTIFIER;
+    }
+  }
+
+  /** A non-portable (Java SDK) logical type that has no argument. */
+  private static class NullArgumentLogicalType implements 
Schema.LogicalType<String, String> {
+    public static final String IDENTIFIER = "NULL_ARGUMENT";
 
     @Override
     public String toBaseType(String input) {
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
index 674b274f907..6e8e46b7afa 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/LogicalTypes.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.jdbc;
 
 import java.sql.JDBCType;
 import java.time.Instant;
-import java.util.Objects;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
@@ -30,11 +29,11 @@ import 
org.apache.beam.sdk.schemas.logicaltypes.UuidLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
 import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import org.checkerframework.checker.nullness.qual.NonNull;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Beam {@link org.apache.beam.sdk.schemas.Schema.LogicalType} 
implementations of JDBC types. */
 class LogicalTypes {
+  // Logical types of the following static members are not portable and are 
preserved for
+  // compatibility reason. Consider using portable logical types when adding 
new ones.
   static final Schema.FieldType JDBC_BIT_TYPE =
       Schema.FieldType.logicalType(
           new PassThroughLogicalType<Boolean>(
@@ -110,69 +109,4 @@ class LogicalTypes {
       return FixedBytes.of(name, length);
     }
   }
-
-  /** Base class for JDBC logical types. */
-  abstract static class JdbcLogicalType<T extends @NonNull Object>
-      implements Schema.LogicalType<T, T> {
-    protected final String identifier;
-    protected final Schema.FieldType argumentType;
-    protected final Schema.FieldType baseType;
-    protected final Object argument;
-
-    protected JdbcLogicalType(
-        String identifier,
-        Schema.FieldType argumentType,
-        Schema.FieldType baseType,
-        Object argument) {
-      this.identifier = identifier;
-      this.argumentType = argumentType;
-      this.baseType = baseType;
-      this.argument = argument;
-    }
-
-    @Override
-    public String getIdentifier() {
-      return identifier;
-    }
-
-    @Override
-    public FieldType getArgumentType() {
-      return argumentType;
-    }
-
-    @Override
-    @SuppressWarnings("TypeParameterUnusedInFormals")
-    public <ArgumentT> ArgumentT getArgument() {
-      return (ArgumentT) argument;
-    }
-
-    @Override
-    public Schema.FieldType getBaseType() {
-      return baseType;
-    }
-
-    @Override
-    public T toBaseType(T input) {
-      return input;
-    }
-
-    @Override
-    public boolean equals(@Nullable Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (!(o instanceof JdbcLogicalType)) {
-        return false;
-      }
-      JdbcLogicalType<?> that = (JdbcLogicalType<?>) o;
-      return Objects.equals(identifier, that.identifier)
-          && Objects.equals(baseType, that.baseType)
-          && Objects.equals(argument, that.argument);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(identifier, baseType, argument);
-    }
-  }
 }
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index ed8745ec2ac..54a473d1b52 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -17,6 +17,7 @@
 
 # pytype: skip-file
 
+import datetime
 import logging
 import time
 import typing
@@ -60,7 +61,8 @@ JdbcTestRow = typing.NamedTuple(
     "JdbcTestRow",
     [("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
      ("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
-     ("f_decimal", Decimal)],
+     ("f_decimal", Decimal), ("f_date", datetime.date),
+     ("f_time", datetime.time)],
 )
 coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
 
@@ -132,7 +134,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
         "f_float DOUBLE PRECISION, " + "f_char CHAR(10), " +
         "f_varchar VARCHAR(10), " + f"f_bytes {binary_type[0]}, " +
         f"f_varbytes {binary_type[1]}, " + "f_timestamp TIMESTAMP(3), " +
-        "f_decimal DECIMAL(10, 2))")
+        "f_decimal DECIMAL(10, 2), " + "f_date DATE, " + "f_time TIME(3))")
     inserted_rows = [
         JdbcTestRow(
             i,
@@ -144,7 +146,11 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
             # In alignment with Java Instant which supports milli precision.
             Timestamp.of(seconds=round(time.time(), 3)),
             # Test both positive and negative numbers.
-            Decimal(f'{i-1}.23')) for i in range(ROW_COUNT)
+            Decimal(f'{i-1}.23'),
+            # Test both date before or after EPOCH
+            datetime.date(1969 + i, i % 12 + 1, i % 31 + 1),
+            datetime.time(i % 24, i % 60, i % 60, (i * 1000) % 1_000_000))
+        for i in range(ROW_COUNT)
     ]
     expected_row = []
     for row in inserted_rows:
@@ -163,7 +169,9 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
               f_bytes,
               row.f_bytes,
               row.f_timestamp,
-              row.f_decimal))
+              row.f_decimal,
+              row.f_date,
+              row.f_time))
 
     with TestPipeline() as p:
       p.not_use_test_runner_api = True
diff --git a/sdks/python/apache_beam/io/jdbc.py 
b/sdks/python/apache_beam/io/jdbc.py
index f8f24ddeb8d..903b0d1b0fe 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -86,6 +86,7 @@
 
 # pytype: skip-file
 
+import datetime
 import typing
 
 import numpy as np
@@ -94,7 +95,10 @@ from apache_beam.coders import RowCoder
 from apache_beam.transforms.external import BeamJarExpansionService
 from apache_beam.transforms.external import ExternalTransform
 from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+from apache_beam.typehints.schemas import LogicalType
+from apache_beam.typehints.schemas import MillisInstant
 from apache_beam.typehints.schemas import typing_to_runner_api
+from apache_beam.utils.timestamp import Timestamp
 
 __all__ = [
     'WriteToJdbc',
@@ -355,3 +359,99 @@ class ReadFromJdbc(ExternalTransform):
         ),
         expansion_service or default_io_expansion_service(classpath),
     )
+
+
+@LogicalType.register_logical_type
+class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
+  """
+  For internal use only; no backwards-compatibility guarantees.
+
+  Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
+  has been migrated to Beam portable logical types.
+  """
+  def __init__(self, argument=""):
+    pass
+
+  @classmethod
+  def representation_type(cls):
+    # type: () -> type
+    return Timestamp
+
+  @classmethod
+  def urn(cls):
+    return "beam:logical_type:javasdk_date:v1"
+
+  @classmethod
+  def language_type(cls):
+    return datetime.date
+
+  def to_representation_type(self, value):
+    # type: (datetime.date) -> Timestamp
+    return Timestamp.from_utc_datetime(
+        datetime.datetime.combine(
+            value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
+
+  def to_language_type(self, value):
+    # type: (Timestamp) -> datetime.date
+
+    return value.to_utc_datetime().date()
+
+  @classmethod
+  def argument_type(cls):
+    return str
+
+  def argument(self):
+    return ""
+
+  @classmethod
+  def _from_typing(cls, typ):
+    return cls()
+
+
+@LogicalType.register_logical_type
+class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
+  """
+  For internal use only; no backwards-compatibility guarantees.
+
+  Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
+  JDBCIO has been migrated to Beam portable logical types.
+  """
+  def __init__(self, argument=""):
+    pass
+
+  @classmethod
+  def representation_type(cls):
+    # type: () -> type
+    return Timestamp
+
+  @classmethod
+  def urn(cls):
+    return "beam:logical_type:javasdk_time:v1"
+
+  @classmethod
+  def language_type(cls):
+    return datetime.time
+
+  def to_representation_type(self, value):
+    # type: (datetime.date) -> Timestamp
+    return Timestamp.from_utc_datetime(
+        datetime.datetime.combine(
+            datetime.datetime.utcfromtimestamp(0),
+            value,
+            tzinfo=datetime.timezone.utc))
+
+  def to_language_type(self, value):
+    # type: (Timestamp) -> datetime.date
+
+    return value.to_utc_datetime().time()
+
+  @classmethod
+  def argument_type(cls):
+    return str
+
+  def argument(self):
+    return ""
+
+  @classmethod
+  def _from_typing(cls, typ):
+    return cls()

Reply via email to