robertwb commented on a change in pull request #15484:
URL: https://github.com/apache/beam/pull/15484#discussion_r705811530



##########
File path: sdks/python/apache_beam/portability/common_urns.py
##########
@@ -66,3 +67,5 @@
 requirements = StandardRequirements.Enum
 
 displayData = StandardDisplayData.DisplayData
+
+java_class_lookup = ExpansionMethods.JAVA_CLASS_LOOKUP

Review comment:
       newline

##########
File path: sdks/python/apache_beam/transforms/external.py
##########
@@ -144,6 +147,101 @@ def _get_named_tuple_instance(self):
     return self._tuple_instance
 
 
+class JavaClassLookupPayloadBuilder(PayloadBuilder):
+  """
+  Builds a payload for directly instantiating a Java transform using a
+  constructor and builder methods.
+  """
+  def __init__(self, class_name):
+    """
+    :param class_name: fully qualified name of the transform class.
+    """
+    if not class_name:
+      raise ValueError('Class name must not be empty')
+
+    self._class_name = class_name
+    self._constructor_method = None
+    self._constructor_params = None
+    self._builder_methods_and_params = OrderedDict()
+
+  def _get_schema_proto_and_payload(self, **kwargs):
+    named_fields = []
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**kwargs)
+    schema = named_tuple_to_schema(type(row))
+
+    payload = RowCoder(schema).encode(row)
+    return (schema_proto, payload)
+
+  def build(self):
+    constructor_params = self._constructor_params or {}
+    constructor_schema, constructor_payload = (
+        self._get_schema_proto_and_payload(**constructor_params))
+    payload = JavaClassLookupPayload(
+        class_name=self._class_name,
+        constructor_schema=constructor_schema,
+        constructor_payload=constructor_payload)
+    if self._constructor_method:
+      payload.constructor_method = self._constructor_method
+
+    for builder_method_name, params in 
self._builder_methods_and_params.items():
+      builder_method_schema, builder_method_payload = (
+          self._get_schema_proto_and_payload(**params))
+      builder_method = BuilderMethod(
+          name=builder_method_name,
+          schema=builder_method_schema,
+          payload=builder_method_payload)
+      builder_method.name = builder_method_name
+      payload.builder_methods.append(builder_method)
+    return payload
+
+  def add_constructor(self, **kwargs):

Review comment:
       perhaps this should be 'with' rather than 'add' as there can only be 
one? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to