robertwb commented on code in PR #29169: URL: https://github.com/apache/beam/pull/29169#discussion_r1374875434
########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -27,8 +27,31 @@ from typing import Union import js2py +from js2py.base import JsObjectWrapper +from js2py.base import PyJsArray +from js2py.base import PyJsArrayBuffer +from js2py.base import PyJsBoolean +from js2py.base import PyJsError +from js2py.base import PyJsFloat32Array +from js2py.base import PyJsFloat64Array +from js2py.base import PyJsInt16Array +from js2py.base import PyJsInt32Array +from js2py.base import PyJsInt8Array +from js2py.base import PyJsNull +from js2py.base import PyJsNumber +from js2py.base import PyJsObject +from js2py.base import PyJsString +from js2py.base import PyJsUint16Array +from js2py.base import PyJsUint32Array +from js2py.base import PyJsUint8Array +from js2py.base import PyJsUint8ClampedArray +from js2py.base import PyJsUndefined +from js2py.base import to_python +from js2py.constructors.jsdate import PyJsDate +from js2py.internals.simplex import JsException import apache_beam as beam +from apache_beam import Row Review Comment: Same. beam.Row is sufficient. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -27,8 +27,31 @@ from typing import Union import js2py +from js2py.base import JsObjectWrapper Review Comment: We generally try to follow https://google.github.io/styleguide/pyguide.html#22-imports (import the module, not each individual thing). You might want to import base as a more descriptive name though. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row Review Comment: This is a case where it's probably clearer to put the return in each clause than re-assign row and return the mutated value a the end. An else clause can return the input itself. (Technically, it might not be a row, so value might be a better name.) ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + + js_array_type = ( + PyJsArray, + PyJsArrayBuffer, + PyJsInt8Array, + PyJsUint8Array, + PyJsUint8ClampedArray, + PyJsInt16Array, + PyJsUint16Array, + PyJsInt32Array, + PyJsUint32Array, + PyJsFloat32Array, + PyJsFloat64Array) + + def _js_object_to_py_object(obj): + if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)): + obj = to_python(obj) + elif isinstance(obj, js_array_type): + return [_js_object_to_py_object(value) for value in obj.to_list()] + elif isinstance(obj, PyJsDate): + obj = obj.to_utc_dt() + elif isinstance(obj, (PyJsNull, PyJsUndefined)): + return None + elif isinstance(obj, PyJsError): + raise RuntimeError(obj['message']) + elif isinstance(obj, PyJsObject): Review Comment: On this note, it's probably ideal if we can get an expected schema ahead of time, and then use that to covert. (See, e.g. how PubSub or Avro write works). ########## sdks/python/apache_beam/yaml/yaml_udf_test.py: ########## @@ -60,16 +66,37 @@ def test_map_to_fields_filter_inline_js(self): language: javascript fields: label: - callable: "function label_map(x) {return x.label + 'x'}" + callable: | + function label_map(x) { Review Comment: These'd be more natural as expressions, right? Or maybe that's not what you're trying to test here. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + + js_array_type = ( + PyJsArray, + PyJsArrayBuffer, + PyJsInt8Array, + PyJsUint8Array, + PyJsUint8ClampedArray, + PyJsInt16Array, + PyJsUint16Array, + PyJsInt32Array, + PyJsUint32Array, + PyJsFloat32Array, + PyJsFloat64Array) + + def _js_object_to_py_object(obj): + if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)): + obj = to_python(obj) + elif isinstance(obj, js_array_type): + return [_js_object_to_py_object(value) for value in obj.to_list()] + elif isinstance(obj, PyJsDate): + obj = obj.to_utc_dt() + elif isinstance(obj, (PyJsNull, PyJsUndefined)): + return None + elif isinstance(obj, PyJsError): + raise RuntimeError(obj['message']) + elif isinstance(obj, PyJsObject): Review Comment: So here's where some discretion is needed. Should this be a Row or a Mapping? E.g. a literal return of {a: 1, b: "foo"} is probably intended to be a Row. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or Review Comment: Not all schemas start with BeamSchema (e.g. user defined named tuples). The test here is whether it's a named tuple. You could do isinstance(tuple) and hasattr(_asdict). ########## sdks/python/apache_beam/yaml/yaml_udf_test.py: ########## @@ -25,20 +25,26 @@ from apache_beam.options import pipeline_options from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.yaml.yaml_mapping import row_to_dict +from apache_beam.yaml.yaml_provider import dicts_to_rows from apache_beam.yaml.yaml_transform import YamlTransform def AsRows(): - return beam.Map(lambda named_tuple: beam.Row(**named_tuple._asdict())) + return beam.Map(lambda named_tuple: dicts_to_rows(row_to_dict(named_tuple))) class YamlUDFMappingTest(unittest.TestCase): def __init__(self, method_name='runYamlMappingTest'): super().__init__(method_name) self.data = [ - beam.Row(label='11a', conductor=11, rank=0), - beam.Row(label='37a', conductor=37, rank=1), - beam.Row(label='389a', conductor=389, rank=2), + beam.Row( Review Comment: Might be a bit of an aside, but if we're going to be adding attributes here we should either make them meaningful or arbitrary. (These are actually references to elliptic curves. If we wanted a repeated, nest value, we could provide generators as 2-tuple points [(x=0, y=0)], [(x=0, y=0)], and [(x=0, y=0), (x=1, y=0)] for 11a, 37a, and 389a respectively in the most standard form, but maybe it's easier to just use something else, e.g. col1, if we're trying to explore arbitrary schema types (though this'd be a lot of re-writing).) ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) Review Comment: Don't mutate the input. You can use dict comprehension here. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + + js_array_type = ( + PyJsArray, + PyJsArrayBuffer, + PyJsInt8Array, + PyJsUint8Array, + PyJsUint8ClampedArray, + PyJsInt16Array, + PyJsUint16Array, + PyJsInt32Array, + PyJsUint32Array, + PyJsFloat32Array, + PyJsFloat64Array) + + def _js_object_to_py_object(obj): + if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)): + obj = to_python(obj) + elif isinstance(obj, js_array_type): + return [_js_object_to_py_object(value) for value in obj.to_list()] + elif isinstance(obj, PyJsDate): + obj = obj.to_utc_dt() + elif isinstance(obj, (PyJsNull, PyJsUndefined)): + return None + elif isinstance(obj, PyJsError): + raise RuntimeError(obj['message']) + elif isinstance(obj, PyJsObject): + return { + key: _js_object_to_py_object(value['value']) + for key, Review Comment: You can put ()'s around `key, value` to make yapf do the right thing. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + + js_array_type = ( + PyJsArray, + PyJsArrayBuffer, + PyJsInt8Array, + PyJsUint8Array, + PyJsUint8ClampedArray, + PyJsInt16Array, + PyJsUint16Array, + PyJsInt32Array, + PyJsUint32Array, + PyJsFloat32Array, + PyJsFloat64Array) + + def _js_object_to_py_object(obj): + if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)): + obj = to_python(obj) Review Comment: Same as above. Let's not mix returning directly and mutating obj and returning that. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): Review Comment: Better to use list comprehension `[row_to_dict(x) for x in row]`. ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) + for idx in range(len(row_list)): + row_list[idx] = row_to_dict(row_list[idx]) + return row + + # TODO(yaml) Consider adding optional language version parameter to support # ECMAScript 5 and 6 def _expand_javascript_mapping_func( original_fields, expression=None, callable=None, path=None, name=None): + + js_array_type = ( + PyJsArray, + PyJsArrayBuffer, + PyJsInt8Array, + PyJsUint8Array, + PyJsUint8ClampedArray, + PyJsInt16Array, + PyJsUint16Array, + PyJsInt32Array, + PyJsUint32Array, + PyJsFloat32Array, + PyJsFloat64Array) + + def _js_object_to_py_object(obj): + if isinstance(obj, (PyJsNumber, PyJsString, PyJsBoolean)): + obj = to_python(obj) + elif isinstance(obj, js_array_type): + return [_js_object_to_py_object(value) for value in obj.to_list()] + elif isinstance(obj, PyJsDate): + obj = obj.to_utc_dt() + elif isinstance(obj, (PyJsNull, PyJsUndefined)): + return None + elif isinstance(obj, PyJsError): + raise RuntimeError(obj['message']) + elif isinstance(obj, PyJsObject): + return { + key: _js_object_to_py_object(value['value']) + for key, + value in obj.own.items() + } + elif isinstance(obj, JsObjectWrapper): + return _js_object_to_py_object(obj._obj) + + return obj + + def _catch_js_errors(func): + try: + result = func() + except JsException as e: + result = getattr(e, 'mes') + return result + if expression: args = ', '.join(original_fields) js_func = f'function fn({args}) {{return ({expression})}}' - js_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func)) - return lambda __row__: js_callable(*__row__._asdict().values()) + js_expr_callable = _CustomJsObjectWrapper(js2py.eval_js(js_func)) + fn = lambda __row__: lambda: js_expr_callable( + *row_to_dict(__row__).values()) elif callable: js_callable = _CustomJsObjectWrapper(js2py.eval_js(callable)) - return lambda __row__: js_callable(__row__._asdict()) + fn = lambda __row__: lambda: js_callable(row_to_dict(__row__)) else: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() js = js2py.EvalJs() js.eval(udf_code) - js_callable = _CustomJsObjectWrapper(getattr(js, name)) - return lambda __row__: js_callable(__row__._asdict()) + js_file_callable = _CustomJsObjectWrapper(getattr(js, name)) + fn = lambda __row__: lambda: js_file_callable(row_to_dict(__row__)) + + return lambda __row__: dicts_to_rows( + _js_object_to_py_object(_catch_js_errors(fn(__row__)))) Review Comment: I don't thing we want to be catching errors and silently returning them as strings. We should let them propagate (and get processed by the error handling; I think that should just work). ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -75,28 +99,89 @@ def __setstate__(self, state): self.__dict__.update(state) +def row_to_dict(row): + if (str(type(row).__name__).startswith('BeamSchema_') or + isinstance(row, Row)): + row = row._asdict() + if isinstance(row, dict): + for key, value in row.items(): + row[key] = row_to_dict(value) + elif not isinstance(row, str) and isinstance(row, Iterable): + row_list = list(row) Review Comment: You're never returning row_list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
