robertwb commented on a change in pull request #15484:
URL: https://github.com/apache/beam/pull/15484#discussion_r713397475



##########
File path: sdks/python/apache_beam/transforms/external.py
##########
@@ -144,6 +147,101 @@ def _get_named_tuple_instance(self):
     return self._tuple_instance
 
 
+class JavaClassLookupPayloadBuilder(PayloadBuilder):
+  """
+  Builds a payload for directly instantiating a Java transform using a
+  constructor and builder methods.
+  """
+  def __init__(self, class_name):
+    """
+    :param class_name: fully qualified name of the transform class.
+    """
+    if not class_name:
+      raise ValueError('Class name must not be empty')
+
+    self._class_name = class_name
+    self._constructor_method = None
+    self._constructor_params = None
+    self._builder_methods_and_params = OrderedDict()
+
+  def _get_schema_proto_and_payload(self, **kwargs):
+    named_fields = []
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**kwargs)
+    schema = named_tuple_to_schema(type(row))
+
+    payload = RowCoder(schema).encode(row)
+    return (schema_proto, payload)
+
+  def build(self):
+    constructor_params = self._constructor_params or {}
+    constructor_schema, constructor_payload = (
+        self._get_schema_proto_and_payload(**constructor_params))
+    payload = JavaClassLookupPayload(
+        class_name=self._class_name,
+        constructor_schema=constructor_schema,
+        constructor_payload=constructor_payload)
+    if self._constructor_method:
+      payload.constructor_method = self._constructor_method
+
+    for builder_method_name, params in 
self._builder_methods_and_params.items():
+      builder_method_schema, builder_method_payload = (
+          self._get_schema_proto_and_payload(**params))
+      builder_method = BuilderMethod(
+          name=builder_method_name,
+          schema=builder_method_schema,
+          payload=builder_method_payload)
+      builder_method.name = builder_method_name
+      payload.builder_methods.append(builder_method)
+    return payload
+
+  def add_constructor(self, **kwargs):
+    """
+    Specifies the Java constructor to use.
+
+    :param kwargs: parameter names and values of the constructor.
+    """
+    if self._constructor_method or self._constructor_params:
+      raise ValueError(
+          'Constructor or constructor method can only be specified once')
+
+    self._constructor_params = kwargs
+
+  def add_constructor_method(self, method_name, **kwargs):
+    """
+    Specifies the Java constructor method to use.
+
+    :param method_name: name of the constructor method.
+    :param kwargs: parameter names and values of the constructor method.
+    """
+    if self._constructor_method or self._constructor_params:
+      raise ValueError(
+          'Constructor or constructor method can only be specified once')
+
+    self._constructor_method = method_name
+    self._constructor_params = kwargs
+
+  def add_builder_method(self, method_name, **kwargs):

Review comment:
       Thanks. 
   
   Just to confirm, if the Java constructor/method does not have an argument 
named `foo` then `builderMethod(foo=x)` will fail, right? And if it does have 
an argument named `foo` and `bar` then `builderMethod(foo=x, bar=y)` will 
produce the same result as `builderMethod(bar=y, foo=x)`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to