robertwb commented on code in PR #29169:
URL: https://github.com/apache/beam/pull/29169#discussion_r1374875434


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,8 +27,31 @@
 from typing import Union
 
 import js2py
+from js2py.base import JsObjectWrapper
+from js2py.base import PyJsArray
+from js2py.base import PyJsArrayBuffer
+from js2py.base import PyJsBoolean
+from js2py.base import PyJsError
+from js2py.base import PyJsFloat32Array
+from js2py.base import PyJsFloat64Array
+from js2py.base import PyJsInt16Array
+from js2py.base import PyJsInt32Array
+from js2py.base import PyJsInt8Array
+from js2py.base import PyJsNull
+from js2py.base import PyJsNumber
+from js2py.base import PyJsObject
+from js2py.base import PyJsString
+from js2py.base import PyJsUint16Array
+from js2py.base import PyJsUint32Array
+from js2py.base import PyJsUint8Array
+from js2py.base import PyJsUint8ClampedArray
+from js2py.base import PyJsUndefined
+from js2py.base import to_python
+from js2py.constructors.jsdate import PyJsDate
+from js2py.internals.simplex import JsException
 
 import apache_beam as beam
+from apache_beam import Row

Review Comment:
   Same. beam.Row is sufficient.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,8 +27,31 @@
 from typing import Union
 
 import js2py
+from js2py.base import JsObjectWrapper

Review Comment:
   We generally try to follow 
https://google.github.io/styleguide/pyguide.html#22-imports (import the module, 
not each individual thing). You might want to import base as a more descriptive 
name though. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row

Review Comment:
   This is a case where it's probably clearer to put the return in each clause 
than re-assign row and return the mutated value a the end. An else clause can 
return the input itself. (Technically, it might not be a row, so value might be 
a better name.)



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, PyJsDate):
+      obj = obj.to_utc_dt()
+    elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+      return None
+    elif isinstance(obj, PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, PyJsObject):

Review Comment:
   On this note, it's probably ideal if we can get an expected schema ahead of 
time, and then use that to covert. (See, e.g. how PubSub or Avro write works). 



##########
sdks/python/apache_beam/yaml/yaml_udf_test.py:
##########
@@ -60,16 +66,37 @@ def test_map_to_fields_filter_inline_js(self):
         language: javascript
         fields:
           label:
-            callable: "function label_map(x) {return x.label + 'x'}"
+            callable: |
+              function label_map(x) {

Review Comment:
   These'd be more natural as expressions, right? Or maybe that's not what 
you're trying to test here. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, PyJsDate):
+      obj = obj.to_utc_dt()
+    elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+      return None
+    elif isinstance(obj, PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, PyJsObject):

Review Comment:
   So here's where some discretion is needed. Should this be a Row or a 
Mapping? E.g. a literal return of {a: 1, b: "foo"} is probably intended to be a 
Row. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or

Review Comment:
   Not all schemas start with BeamSchema (e.g. user defined named tuples). The 
test here is whether it's a named tuple. You could do isinstance(tuple) and 
hasattr(_asdict). 



##########
sdks/python/apache_beam/yaml/yaml_udf_test.py:
##########
@@ -25,20 +25,26 @@
 from apache_beam.options import pipeline_options
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.yaml.yaml_mapping import row_to_dict
+from apache_beam.yaml.yaml_provider import dicts_to_rows
 from apache_beam.yaml.yaml_transform import YamlTransform
 
 
 def AsRows():
-  return beam.Map(lambda named_tuple: beam.Row(**named_tuple._asdict()))
+  return beam.Map(lambda named_tuple: dicts_to_rows(row_to_dict(named_tuple)))
 
 
 class YamlUDFMappingTest(unittest.TestCase):
   def __init__(self, method_name='runYamlMappingTest'):
     super().__init__(method_name)
     self.data = [
-        beam.Row(label='11a', conductor=11, rank=0),
-        beam.Row(label='37a', conductor=37, rank=1),
-        beam.Row(label='389a', conductor=389, rank=2),
+        beam.Row(

Review Comment:
   Might be a bit of an aside, but if we're going to be adding attributes here 
we should either make them meaningful or arbitrary. (These are actually 
references to elliptic curves. If we wanted a repeated, nest value, we could 
provide generators as 2-tuple points [(x=0, y=0)], [(x=0, y=0)], and [(x=0, 
y=0), (x=1, y=0)] for 11a, 37a, and 389a respectively in the most standard 
form, but maybe it's easier to just use something else, e.g. col1, if we're 
trying to explore arbitrary schema types (though this'd be a lot of 
re-writing).)



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)

Review Comment:
   Don't mutate the input. You can use dict comprehension here. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, PyJsDate):
+      obj = obj.to_utc_dt()
+    elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+      return None
+    elif isinstance(obj, PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, PyJsObject):
+      return {
+          key: _js_object_to_py_object(value['value'])
+          for key,

Review Comment:
   You can put ()'s around `key, value` to make yapf do the right thing.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)

Review Comment:
   Same as above. Let's not mix returning directly and mutating obj and 
returning that. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):

Review Comment:
   Better to use list comprehension `[row_to_dict(x) for x in row]`.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, PyJsDate):
+      obj = obj.to_utc_dt()
+    elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+      return None
+    elif isinstance(obj, PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, PyJsObject):
+      return {
+          key: _js_object_to_py_object(value['value'])
+          for key,
+          value in obj.own.items()
+      }
+    elif isinstance(obj, JsObjectWrapper):
+      return _js_object_to_py_object(obj._obj)
+
+    return obj
+
+  def _catch_js_errors(func):
+    try:
+      result = func()
+    except JsException as e:
+      result = getattr(e, 'mes')
+    return result
+
   if expression:
     args = ', '.join(original_fields)
     js_func = f'function fn({args}) {{return ({expression})}}'
-    js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
-    return lambda __row__: js_callable(*__row__._asdict().values())
+    js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+    fn = lambda __row__: lambda: js_expr_callable(
+        *row_to_dict(__row__).values())
 
   elif callable:
     js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
-    return lambda __row__: js_callable(__row__._asdict())
+    fn = lambda __row__: lambda: js_callable(row_to_dict(__row__))
 
   else:
     if not path.endswith('.js'):
       raise ValueError(f'File "{path}" is not a valid .js file.')
     udf_code = FileSystems.open(path).read().decode()
     js = js2py.EvalJs()
     js.eval(udf_code)
-    js_callable = _CustomJsObjectWrapper(getattr(js, name))
-    return lambda __row__: js_callable(__row__._asdict())
+    js_file_callable = _CustomJsObjectWrapper(getattr(js, name))
+    fn = lambda __row__: lambda: js_file_callable(row_to_dict(__row__))
+
+  return lambda __row__: dicts_to_rows(
+      _js_object_to_py_object(_catch_js_errors(fn(__row__))))

Review Comment:
   I don't thing we want to be catching errors and silently returning them as 
strings. We should let them propagate (and get processed by the error handling; 
I think that should just work). 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)

Review Comment:
   You're never returning row_list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to