TheNeuralBit commented on code in PR #23548:
URL: https://github.com/apache/beam/pull/23548#discussion_r1006032273


##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -643,8 +685,20 @@ def from_runner_api(cls, logical_type_proto):
     if logical_type is None:
       raise ValueError(
           "No logical type registered for URN '%s'" % logical_type_proto.urn)
-    # TODO(bhulette): Use argument
-    return logical_type()
+    if not logical_type_proto.HasField(
+        "argument_type") or not logical_type_proto.HasField("argument"):
+      # logical type_proto without argument
+      return logical_type()
+    else:
+      try:
+        argument = value_from_runner_api(
+            logical_type_proto.argument_type, logical_type_proto.argument)
+      except ValueError:
+        # TODO(https://github.com/apache/beam/issues/23373): Complete support
+        # for logical types that require arguments beyond atomic type.
+        # For now, skip arguments.
+        return logical_type()

Review Comment:
   I think it would be better to fail loudly here. It's dangerous to move 
forward with a type we don't understand.
   
   Are there cases where we need to let this through?



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -666,6 +720,28 @@ def _from_typing(cls, typ):
     return cls()
 
 
+class PassThroughLogicalType(LogicalType[LanguageT, LanguageT, ArgT]):
+  """A base class for LogicalTypes that use the same type as the underlying
+  representation type.
+  """
+  def to_language_type(self, value):
+    return value
+
+  @classmethod
+  def representation_type(cls):
+    # type: () -> type
+    return cls.language_type()
+
+  def to_representation_type(self, value):
+    return value
+
+  @classmethod
+  def _from_typing(cls, typ):
+    # type: (type) -> LogicalType
+    # TODO: enable argument

Review Comment:
   can we link an issue here (it could be the same one for following up on 
non-atomic types)



##########
sdks/python/apache_beam/typehints/schemas.py:
##########
@@ -851,3 +927,129 @@ def _from_typing(cls, typ):
 # TODO(yathu,BEAM-10722): Investigate and resolve conflicts in logical type
 # registration when more than one logical types sharing the same language type
 LogicalType.register_logical_type(DecimalLogicalType)
+
+
[email protected]_logical_type
+class FixedBytes(PassThroughLogicalType[bytes, np.int32]):
+  """A logical type for fixed-length bytes."""
+  @classmethod
+  def urn(cls):
+    return common_urns.fixed_bytes.urn
+
+  def __init__(self, length: np.int32):
+    self.length = length
+
+  @classmethod
+  def language_type(cls) -> type:
+    return bytes
+
+  def to_language_type(self, value: bytes):
+    length = len(value)
+    if length > self.length:
+      raise ValueError(
+          "value length {} > allowed length {}".format(length, self.length))
+    elif length < self.length:
+      # padding at the end
+      value = value + b'\0' * (self.length - length)
+
+    return value
+
+  @classmethod
+  def argument_type(cls):
+    return np.int32
+
+  def argument(self):
+    return self.length
+
+
[email protected]_logical_type
+class VariableBytes(PassThroughLogicalType[bytes, np.int32]):
+  """A logical type for variable-length bytes with specified maximum length."""
+  @classmethod
+  def urn(cls):
+    return common_urns.var_bytes.urn
+
+  def __init__(self, max_length: np.int32 = np.iinfo(np.int32).max):
+    self.max_length = max_length
+
+  @classmethod
+  def language_type(cls) -> type:
+    return bytes
+
+  def to_language_type(self, value: bytes):
+    length = len(value)
+    if length > self.max_length:
+      raise ValueError(
+          "value length {} > allowed length {}".format(length, 
self.max_length))
+
+    return value
+
+  @classmethod
+  def argument_type(cls):
+    return np.int32
+
+  def argument(self):
+    return self.max_length
+
+
[email protected]_logical_type
+class FixedString(PassThroughLogicalType[str, np.int32]):
+  """A logical type for fixed-length string."""
+  @classmethod
+  def urn(cls):
+    return common_urns.fixed_char.urn
+
+  def __init__(self, length: np.int32):
+    self.length = length
+
+  @classmethod
+  def language_type(cls) -> type:
+    return str
+
+  def to_language_type(self, value: str):
+    length = len(value)
+    if length > self.length:
+      raise ValueError(
+          "value length {} > allowed length {}".format(length, self.length))
+    elif length < self.length:
+      # padding at the end
+      value = value + ' ' * (self.length - length)
+
+    return value
+
+  @classmethod
+  def argument_type(cls):
+    return np.int32
+
+  def argument(self):
+    return self.length
+
+
[email protected]_logical_type
+class VariableString(PassThroughLogicalType[str, np.int32]):
+  """A logical type for variable-length string with specified maximum 
length."""
+  @classmethod
+  def urn(cls):
+    return common_urns.var_char.urn
+
+  def __init__(self, max_length: np.int32 = np.iinfo(np.int32).max):
+    self.max_length = max_length
+
+  @classmethod
+  def language_type(cls) -> type:
+    return str
+
+  def to_language_type(self, value: str):
+    length = len(value)
+    if length > self.max_length:
+      raise ValueError(
+          "value length {} > allowed length {}".format(length, 
self.max_length))
+
+    return value
+
+  @classmethod
+  def argument_type(cls):
+    return np.int32
+
+  def argument(self):
+    return self.max_length

Review Comment:
   Could you document the type mappings for these in the docstring for this 
module?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to