Polber commented on code in PR #29169:
URL: https://github.com/apache/beam/pull/29169#discussion_r1375070556


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
     self.__dict__.update(state)
 
 
+def row_to_dict(row):
+  if (str(type(row).__name__).startswith('BeamSchema_') or
+      isinstance(row, Row)):
+    row = row._asdict()
+  if isinstance(row, dict):
+    for key, value in row.items():
+      row[key] = row_to_dict(value)
+  elif not isinstance(row, str) and isinstance(row, Iterable):
+    row_list = list(row)
+    for idx in range(len(row_list)):
+      row_list[idx] = row_to_dict(row_list[idx])
+  return row
+
+
 # TODO(yaml) Consider adding optional language version parameter to support
 #  ECMAScript 5 and 6
 def _expand_javascript_mapping_func(
     original_fields, expression=None, callable=None, path=None, name=None):
+
+  js_array_type = (
+      PyJsArray,
+      PyJsArrayBuffer,
+      PyJsInt8Array,
+      PyJsUint8Array,
+      PyJsUint8ClampedArray,
+      PyJsInt16Array,
+      PyJsUint16Array,
+      PyJsInt32Array,
+      PyJsUint32Array,
+      PyJsFloat32Array,
+      PyJsFloat64Array)
+
+  def _js_object_to_py_object(obj):
+    if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+      obj = to_python(obj)
+    elif isinstance(obj, js_array_type):
+      return [_js_object_to_py_object(value) for value in obj.to_list()]
+    elif isinstance(obj, PyJsDate):
+      obj = obj.to_utc_dt()
+    elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+      return None
+    elif isinstance(obj, PyJsError):
+      raise RuntimeError(obj['message'])
+    elif isinstance(obj, PyJsObject):
+      return {
+          key: _js_object_to_py_object(value['value'])
+          for key,
+          value in obj.own.items()
+      }
+    elif isinstance(obj, JsObjectWrapper):
+      return _js_object_to_py_object(obj._obj)
+
+    return obj
+
+  def _catch_js_errors(func):
+    try:
+      result = func()
+    except JsException as e:
+      result = getattr(e, 'mes')
+    return result
+
   if expression:
     args = ', '.join(original_fields)
     js_func = f'function fn({args}) {{return ({expression})}}'
-    js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
-    return lambda __row__: js_callable(*__row__._asdict().values())
+    js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+    fn = lambda __row__: lambda: js_expr_callable(
+        *row_to_dict(__row__).values())
 
   elif callable:
     js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
-    return lambda __row__: js_callable(__row__._asdict())
+    fn = lambda __row__: lambda: js_callable(row_to_dict(__row__))
 
   else:
     if not path.endswith('.js'):
       raise ValueError(f'File "{path}" is not a valid .js file.')
     udf_code = FileSystems.open(path).read().decode()
     js = js2py.EvalJs()
     js.eval(udf_code)
-    js_callable = _CustomJsObjectWrapper(getattr(js, name))
-    return lambda __row__: js_callable(__row__._asdict())
+    js_file_callable = _CustomJsObjectWrapper(getattr(js, name))
+    fn = lambda __row__: lambda: js_file_callable(row_to_dict(__row__))
+
+  return lambda __row__: dicts_to_rows(
+      _js_object_to_py_object(_catch_js_errors(fn(__row__))))

Review Comment:
   It does propagate. It extracts a `PyJsError` object and the error is thrown 
in `_js_object_to_py_object()` which is caught by the existing error handling 
framework. I thought it was a bit cleaner of an error message because if you 
have a custom error/exception in the JS script, it gets a bit hidden in the 
stack trace of JSException as opposed to being the top-level error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to