robertwb commented on a change in pull request #12764:
URL: https://github.com/apache/beam/pull/12764#discussion_r483322224



##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -279,3 +301,184 @@ def schema_from_element_type(element_type):  # (type) -> 
schema_pb2.Schema
 def named_fields_from_element_type(
     element_type):  # (type) -> typing.List[typing.Tuple[unicode, type]]
   return named_fields_from_schema(schema_from_element_type(element_type))
+
+
+# Registry of typings for a schema by UUID
+class LogicalTypeRegistry(object):
+  def __init__(self):
+    self.by_urn = {}
+    self.by_logical_type = {}
+    self.by_language_type = {}
+
+  def add(self, urn, logical_type):
+    self.by_urn[urn] = logical_type
+    self.by_logical_type[logical_type] = urn
+    self.by_language_type[logical_type.language_type()] = logical_type
+
+  def get_logical_type_by_urn(self, urn):
+    return self.by_urn.get(urn, None)
+
+  def get_urn_by_logial_type(self, logical_type):
+    return self.by_logical_type.get(logical_type, None)
+
+  def get_logical_type_by_language_type(self, representation_type):
+    return self.by_language_type.get(representation_type, None)
+
+
+LanguageT = TypeVar('LanguageT')
+RepresentationT = TypeVar('RepresentationT')
+ArgT = TypeVar('ArgT')
+
+
+class LogicalType(Generic[LanguageT, RepresentationT, ArgT]):
+  _known_logical_types = LogicalTypeRegistry()
+
+  @classmethod
+  def urn(cls):
+    # type: () -> unicode
+
+    """Return the URN used to identify this logical type"""
+    raise NotImplementedError()
+
+  @classmethod
+  def language_type(cls):
+    # type: () -> type
+
+    """Return the language type this LogicalType encodes.
+
+    The returned type should match LanguageT"""
+    raise NotImplementedError()
+
+  @classmethod
+  def representation_type(cls):
+    # type: () -> type
+
+    """Return the type of the representation this LogicalType uses to encode 
the
+    language type.
+
+    The returned type should match RepresentationT"""
+    raise NotImplementedError()
+
+  @classmethod
+  def argument_type(cls):
+    # type: () -> type
+
+    """Return the type of the argument used for variations of this LogicalType.
+
+    The returned type should match ArgT"""
+    raise NotImplementedError(cls)
+
+  def argument(self):
+    # type: () -> ArgT
+
+    """Return the argument for this instance of the LogicalType."""
+    raise NotImplementedError()
+
+  def to_representation_type(value):
+    # type: (LanguageT) -> RepresentationT
+
+    """Convert an instance of LanguageT to RepresentationT."""
+    raise NotImplementedError()
+
+  def to_language_type(value):
+    # type: (RepresentationT) -> LanguageT
+
+    """Convert an instance of RepresentationT to LanguageT."""
+    raise NotImplementedError()
+
+  @classmethod
+  def register_logical_type(cls, logical_type_cls):
+    """Register an implementation of LogicalType."""
+    cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls)
+
+  @classmethod
+  def from_typing(cls, typ):
+    # type: (type) -> LogicalType
+
+    """Construct an instance of a registered LogicalType implementation given a
+    typing.
+
+    Raises ValueError if no registered LogicalType implementation can encode 
the
+    given typing."""
+
+    logical_type = cls._known_logical_types.get_logical_type_by_language_type(
+        typ)
+    if logical_type is None:
+      raise ValueError("No logical type registered for typing '%s'" % typ)
+
+    return logical_type._from_typing(typ)
+
+  @classmethod
+  def _from_typing(cls, typ):
+    # type: (type) -> LogicalType
+
+    """Construct an instance of this LogicalType implementation given a typing.
+    """
+    raise NotImplementedError()
+
+  @classmethod
+  def from_runner_api(cls, logical_type_proto):
+    # type: (schema_pb2.LogicalType) -> LogicalType
+
+    """Construct an instance of a registered LogicalType implementation given a
+    proto LogicalType.
+
+    Raises ValueError if no LogicalType registered for the given URN.
+    """
+    logical_type = cls._known_logical_types.get_logical_type_by_urn(
+        logical_type_proto.urn)
+    if logical_type is None:
+      raise ValueError(
+          "No logical type registered for URN '%s'" % logical_type_proto.urn)
+    # TODO(bhulette): Use argument
+    return logical_type()
+
+
+class NoArgumentLogicalType(LogicalType[LanguageT, RepresentationT, None]):
+  @classmethod
+  def argument_type(cls):
+    # type: () -> type
+    return None
+
+  def argument(self):
+    # type: () -> ArgT
+    return None
+
+  @classmethod
+  def _from_typing(cls, typ):
+    # type: (type) -> LogicalType
+
+    # Sicne there's no argument, there can be no additional information encoded

Review comment:
       Since.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to