robertwb commented on code in PR #29169:
URL: https://github.com/apache/beam/pull/29169#discussion_r1375109787
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +79,85 @@ def __setstate__(self, state):
self.__dict__.update(state)
+def row_to_dict(row):
+ if ((isinstance(row, tuple) and hasattr(row, '_asdict')) or
+ isinstance(row, beam.Row)):
+ row = row._asdict()
+ if isinstance(row, dict):
+ return {key: row_to_dict(value) for key, value in row.items()}
+ elif not isinstance(row, str) and isinstance(row, Iterable):
+ return [row_to_dict(value) for value in list(row)]
+ return row
+
+
# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
+
+ js_array_type = (
+ base.PyJsArray,
+ base.PyJsArrayBuffer,
+ base.PyJsInt8Array,
+ base.PyJsUint8Array,
+ base.PyJsUint8ClampedArray,
+ base.PyJsInt16Array,
+ base.PyJsUint16Array,
+ base.PyJsInt32Array,
+ base.PyJsUint32Array,
+ base.PyJsFloat32Array,
+ base.PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+ return base.to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, jsdate.PyJsDate):
+ return obj.to_utc_dt()
+ elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+ return None
+ elif isinstance(obj, base.PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, base.PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for (key, value) in obj.own.items()
+ }
+ elif isinstance(obj, base.JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
+
+ def _catch_js_errors(func):
+ try:
+ result = func()
+ except simplex.JsException as e:
+ result = getattr(e, 'mes')
Review Comment:
why not `e.mes`?
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +79,85 @@ def __setstate__(self, state):
self.__dict__.update(state)
+def row_to_dict(row):
+ if ((isinstance(row, tuple) and hasattr(row, '_asdict')) or
+ isinstance(row, beam.Row)):
+ row = row._asdict()
+ if isinstance(row, dict):
+ return {key: row_to_dict(value) for key, value in row.items()}
+ elif not isinstance(row, str) and isinstance(row, Iterable):
+ return [row_to_dict(value) for value in list(row)]
+ return row
+
+
# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
+
+ js_array_type = (
+ base.PyJsArray,
+ base.PyJsArrayBuffer,
+ base.PyJsInt8Array,
+ base.PyJsUint8Array,
+ base.PyJsUint8ClampedArray,
+ base.PyJsInt16Array,
+ base.PyJsUint16Array,
+ base.PyJsInt32Array,
+ base.PyJsUint32Array,
+ base.PyJsFloat32Array,
+ base.PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+ return base.to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, jsdate.PyJsDate):
+ return obj.to_utc_dt()
+ elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+ return None
+ elif isinstance(obj, base.PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, base.PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for (key, value) in obj.own.items()
+ }
+ elif isinstance(obj, base.JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
+
+ def _catch_js_errors(func):
+ try:
+ result = func()
+ except simplex.JsException as e:
+ result = getattr(e, 'mes')
+ return result
+
if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
- js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
- return lambda __row__: js_callable(*__row__._asdict().values())
+ js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+ fn = lambda __row__: lambda: js_expr_callable(
+ *row_to_dict(__row__).values())
Review Comment:
Does js_expr_callable take keyword arguments so you don't have to rely on
the ordering of `row_to_dict(__row__).values()`? (I think we'll be OK, as long
as __row__ has exactly the expected schema, but maybe it is still compatible
but has extra fields?)
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +99,89 @@ def __setstate__(self, state):
self.__dict__.update(state)
+def row_to_dict(row):
+ if (str(type(row).__name__).startswith('BeamSchema_') or
+ isinstance(row, Row)):
+ row = row._asdict()
+ if isinstance(row, dict):
+ for key, value in row.items():
+ row[key] = row_to_dict(value)
+ elif not isinstance(row, str) and isinstance(row, Iterable):
+ row_list = list(row)
+ for idx in range(len(row_list)):
+ row_list[idx] = row_to_dict(row_list[idx])
+ return row
+
+
# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
+
+ js_array_type = (
+ PyJsArray,
+ PyJsArrayBuffer,
+ PyJsInt8Array,
+ PyJsUint8Array,
+ PyJsUint8ClampedArray,
+ PyJsInt16Array,
+ PyJsUint16Array,
+ PyJsInt32Array,
+ PyJsUint32Array,
+ PyJsFloat32Array,
+ PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)):
+ obj = to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, PyJsDate):
+ obj = obj.to_utc_dt()
+ elif isinstance(obj, (PyJsNull, PyJsUndefined)):
+ return None
+ elif isinstance(obj, PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for key,
+ value in obj.own.items()
+ }
+ elif isinstance(obj, JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
+
+ def _catch_js_errors(func):
+ try:
+ result = func()
+ except JsException as e:
+ result = getattr(e, 'mes')
+ return result
+
if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
- js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
- return lambda __row__: js_callable(*__row__._asdict().values())
+ js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+ fn = lambda __row__: lambda: js_expr_callable(
+ *row_to_dict(__row__).values())
elif callable:
js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
- return lambda __row__: js_callable(__row__._asdict())
+ fn = lambda __row__: lambda: js_callable(row_to_dict(__row__))
else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
- js_callable = _CustomJsObjectWrapper(getattr(js, name))
- return lambda __row__: js_callable(__row__._asdict())
+ js_file_callable = _CustomJsObjectWrapper(getattr(js, name))
+ fn = lambda __row__: lambda: js_file_callable(row_to_dict(__row__))
+
+ return lambda __row__: dicts_to_rows(
+ _js_object_to_py_object(_catch_js_errors(fn(__row__))))
Review Comment:
Catching an exception, turning it into a value, and then re-raising that
value elsewhere makes things a lot harder to follow though. E.g. you catch
simplex.JsException, get the `mes` attribute (which is presumably of type
`PyJsError` so you can detect it later?, and then raise during conversion?
If you want a simpler stack trace, you can do something like
```
try:
return func()
except simplex.JsException as exn:
raise RuntimeError(f"Error evaluating javascript expression: {exn.msg}")
from exn
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +79,85 @@ def __setstate__(self, state):
self.__dict__.update(state)
+def row_to_dict(row):
+ if ((isinstance(row, tuple) and hasattr(row, '_asdict')) or
+ isinstance(row, beam.Row)):
+ row = row._asdict()
+ if isinstance(row, dict):
+ return {key: row_to_dict(value) for key, value in row.items()}
+ elif not isinstance(row, str) and isinstance(row, Iterable):
+ return [row_to_dict(value) for value in list(row)]
+ return row
+
+
# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
+
+ js_array_type = (
+ base.PyJsArray,
+ base.PyJsArrayBuffer,
+ base.PyJsInt8Array,
+ base.PyJsUint8Array,
+ base.PyJsUint8ClampedArray,
+ base.PyJsInt16Array,
+ base.PyJsUint16Array,
+ base.PyJsInt32Array,
+ base.PyJsUint32Array,
+ base.PyJsFloat32Array,
+ base.PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+ return base.to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, jsdate.PyJsDate):
+ return obj.to_utc_dt()
+ elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+ return None
+ elif isinstance(obj, base.PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, base.PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for (key, value) in obj.own.items()
+ }
+ elif isinstance(obj, base.JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
+
+ def _catch_js_errors(func):
+ try:
+ result = func()
+ except simplex.JsException as e:
+ result = getattr(e, 'mes')
+ return result
+
if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
- js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
- return lambda __row__: js_callable(*__row__._asdict().values())
+ js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+ fn = lambda __row__: lambda: js_expr_callable(
Review Comment:
No need for the double-underscore __row__ as you're not worried about
polluting a namespace here.
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -75,28 +79,85 @@ def __setstate__(self, state):
self.__dict__.update(state)
+def row_to_dict(row):
+ if ((isinstance(row, tuple) and hasattr(row, '_asdict')) or
+ isinstance(row, beam.Row)):
+ row = row._asdict()
+ if isinstance(row, dict):
+ return {key: row_to_dict(value) for key, value in row.items()}
+ elif not isinstance(row, str) and isinstance(row, Iterable):
+ return [row_to_dict(value) for value in list(row)]
+ return row
+
+
# TODO(yaml) Consider adding optional language version parameter to support
# ECMAScript 5 and 6
def _expand_javascript_mapping_func(
original_fields, expression=None, callable=None, path=None, name=None):
+
+ js_array_type = (
+ base.PyJsArray,
+ base.PyJsArrayBuffer,
+ base.PyJsInt8Array,
+ base.PyJsUint8Array,
+ base.PyJsUint8ClampedArray,
+ base.PyJsInt16Array,
+ base.PyJsUint16Array,
+ base.PyJsInt32Array,
+ base.PyJsUint32Array,
+ base.PyJsFloat32Array,
+ base.PyJsFloat64Array)
+
+ def _js_object_to_py_object(obj):
+ if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
+ return base.to_python(obj)
+ elif isinstance(obj, js_array_type):
+ return [_js_object_to_py_object(value) for value in obj.to_list()]
+ elif isinstance(obj, jsdate.PyJsDate):
+ return obj.to_utc_dt()
+ elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
+ return None
+ elif isinstance(obj, base.PyJsError):
+ raise RuntimeError(obj['message'])
+ elif isinstance(obj, base.PyJsObject):
+ return {
+ key: _js_object_to_py_object(value['value'])
+ for (key, value) in obj.own.items()
+ }
+ elif isinstance(obj, base.JsObjectWrapper):
+ return _js_object_to_py_object(obj._obj)
+
+ return obj
+
+ def _catch_js_errors(func):
+ try:
+ result = func()
+ except simplex.JsException as e:
+ result = getattr(e, 'mes')
+ return result
+
if expression:
args = ', '.join(original_fields)
js_func = f'function fn({args}) {{return ({expression})}}'
- js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
- return lambda __row__: js_callable(*__row__._asdict().values())
+ js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func))
+ fn = lambda __row__: lambda: js_expr_callable(
+ *row_to_dict(__row__).values())
elif callable:
js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable))
- return lambda __row__: js_callable(__row__._asdict())
+ fn = lambda __row__: lambda: js_callable(row_to_dict(__row__))
else:
if not path.endswith('.js'):
raise ValueError(f'File "{path}" is not a valid .js file.')
udf_code = FileSystems.open(path).read().decode()
js = js2py.EvalJs()
js.eval(udf_code)
- js_callable = _CustomJsObjectWrapper(getattr(js, name))
- return lambda __row__: js_callable(__row__._asdict())
+ js_file_callable = _CustomJsObjectWrapper(getattr(js, name))
+ fn = lambda __row__: lambda: js_file_callable(row_to_dict(__row__))
Review Comment:
It makes it a bit hard to follow to have fn be a lambda that takes a row and
returns a callable. If you want this pattern, this could likely be better done
with function decorators. But there's also redundancy here as well. I think
it'd be cleaner to do something like
```
fn = [your previous logic defining a js -> js evaluation]
def wrapper(row):
try:
return dicts_to_rows(_js_object_to_py_object(fn(row_to_dict(row))))
except simplex.JsException as exn:
raise RuntimeError(f"Error evaluating javascript expression: {exn.msg}")
from exn
return wrapper
```
If you want to split up the errors you could also do
```
def wrapper(row):
row_as_dict = row_to_dict(row)
try:
js_result = fn(row_as_dict)
except simplex.JsException as exn:
raise RuntimeError(f"Error evaluating javascript expression: {exn.msg}")
from exn
return dicts_to_rows(_js_object_to_py_object(js_result))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]