TheNeuralBit commented on code in PR #22679:
URL: https://github.com/apache/beam/pull/22679#discussion_r944628110


##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -112,8 +109,19 @@ def from_user_type(
     return None
 
   @staticmethod
-  def from_fields(fields: Sequence[Tuple[str, type]]) -> RowTypeConstraint:
-    return RowTypeConstraint(fields=fields, user_type=None)
+  def from_fields(
+      fields: Sequence[Tuple[str, type]],
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,

Review Comment:
   Oh I think that error may have showed up on a previous iteration, when I had 
accidentally left out the import.
   
   It's surprising that CI isn't complaining about the lack of Optional here 
though...



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom 
__reduce__
+  function that will regenerate the user_type.
+  """
+  def __init__(
+      self,
+      fields,
+      schema_id: Optional[str] = None,
+      schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
+      field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
+      schema_registry: SchemaTypeRegistry = None,
+  ):
+    from apache_beam.typehints.schemas import named_fields_to_schema
+    from apache_beam.typehints.schemas import named_tuple_from_schema
+
+    if schema_registry is None:
+      kwargs = {}
+    else:
+      kwargs = {'schema_registry': schema_registry}
+
+    schema = named_fields_to_schema(
+        fields,
+        schema_id=schema_id,
+        schema_options=schema_options,
+        field_options=field_options,
+        **kwargs)
+    user_type = named_tuple_from_schema(schema, **kwargs)
+    setattr(user_type, _BEAM_SCHEMA_ID, schema_id)
+
+    super().__init__(
+        fields,
+        user_type,
+        schema_options=schema_options,
+        field_options=field_options)
+
+  def __reduce__(self):
+    return (
+        RowTypeConstraint.from_fields,
+        (
+            self._fields,
+            self._schema_id,
+            self._schema_options,
+            self._field_options))

Review Comment:
   Good idea, thanks



##########
sdks/python/apache_beam/typehints/row_type.py:
##########
@@ -160,3 +168,50 @@ def __repr__(self):
 
   def get_type_for(self, name):
     return dict(self._fields)[name]
+
+
+class GeneratedClassRowTypeConstraint(RowTypeConstraint):
+  """Specialization of RowTypeConstraint which relies on a generated user_type.
+
+  Since the generated user_type cannot be pickled, we supply a custom 
__reduce__

Review Comment:
   I actually don't fully understand it, but it's been a consistent issue with 
the Schema code. Each pickle library (built-in, dill, cloudpickle) fails for a 
different reason. I filed https://github.com/apache/beam/issues/22714 to track 
this, and added a (skipped) test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to