gemini-code-assist[bot] commented on code in PR #38236:
URL: https://github.com/apache/beam/pull/38236#discussion_r3169850420
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +193,137 @@ def py_value_to_js_dict(py_value):
return py_value
-# TODO(yaml) Consider adding optional language version parameter to support
-# ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
- original_fields, expression=None, callable=None, path=None, name=None):
+def js_to_py(obj):
+ """Converts mini-racer mapped objects to standard Python types.
+
+ This is needed because ctx.eval returns objects that implement Mapping
+ and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+ which would fail when Beam tries to serialize rows containing them.
+ We also preserve datetime objects which are correctly produced by ctx.eval
+ for JS Date objects.
+ """
+ if isinstance(obj, datetime.datetime):
+ return obj
+ elif isinstance(obj, Mapping):
+ return {k: js_to_py(v) for k, v in obj.items()}
+ elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):
Review Comment:

The types `Mapping` and `Iterable` are used here but they are not imported
in the global namespace. Since you added `from collections import abc` at line
23, you should use `abc.Mapping` and `abc.Iterable` instead.
```suggestion
elif isinstance(obj, abc.Mapping):
return {k: js_to_py(v) for k, v in obj.items()}
elif not isinstance(obj, (str, bytes)) and isinstance(obj, abc.Iterable):
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +193,137 @@ def py_value_to_js_dict(py_value):
return py_value
-# TODO(yaml) Consider adding optional language version parameter to support
-# ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
- original_fields, expression=None, callable=None, path=None, name=None):
+def js_to_py(obj):
+ """Converts mini-racer mapped objects to standard Python types.
+
+ This is needed because ctx.eval returns objects that implement Mapping
+ and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+ which would fail when Beam tries to serialize rows containing them.
+ We also preserve datetime objects which are correctly produced by ctx.eval
+ for JS Date objects.
+ """
+ if isinstance(obj, datetime.datetime):
+ return obj
+ elif isinstance(obj, Mapping):
+ return {k: js_to_py(v) for k, v in obj.items()}
+ elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):
+ return [js_to_py(v) for v in obj]
+ elif isinstance(obj, str):
+ if _JS_DATE_ISO_REGEX.match(obj):
+ try:
+ return datetime.datetime.fromisoformat(obj[:-1] + '+00:00')
+ except ValueError:
+ return obj
+ return obj
+ else:
+ return obj
- # Check for installed js2py package
- if js2py is None:
- raise ValueError(
- "Javascript mapping functions are not supported on"
- " Python 3.12 or later.")
-
- # import remaining js2py objects
- from js2py import base
- from js2py.constructors import jsdate
- from js2py.internals import simplex
-
- js_array_type = (
- base.PyJsArray,
- base.PyJsArrayBuffer,
- base.PyJsInt8Array,
- base.PyJsUint8Array,
- base.PyJsUint8ClampedArray,
- base.PyJsInt16Array,
- base.PyJsUint16Array,
- base.PyJsInt32Array,
- base.PyJsUint32Array,
- base.PyJsFloat32Array,
- base.PyJsFloat64Array)
-
- def _js_object_to_py_object(obj):
- if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
- return base.to_python(obj)
- elif isinstance(obj, js_array_type):
- return [_js_object_to_py_object(value) for value in obj.to_list()]
- elif isinstance(obj, jsdate.PyJsDate):
- return obj.to_utc_dt()
- elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
- return None
- elif isinstance(obj, base.PyJsError):
- raise RuntimeError(obj['message'])
- elif isinstance(obj, base.PyJsObject):
- return {
- key: _js_object_to_py_object(value['value'])
- for (key, value) in obj.own.items()
- }
- elif isinstance(obj, base.JsObjectWrapper):
- return _js_object_to_py_object(obj._obj)
- return obj
+class JsFilterDoFn(beam.DoFn):
+ def __init__(self, udf_code, function_name):
+ self.udf_code = udf_code
+ self.function_name = function_name
+ self.ctx = None
+
+ def setup(self):
+ self.ctx = MiniRacer()
+ self.ctx.eval(self.udf_code)
+
+ def process(self, element):
+ row_as_dict = py_value_to_js_dict(element)
+ result = self.ctx.call(self.function_name, row_as_dict)
+ result = js_to_py(result)
+ if result:
+ yield element
+
+
+class JsMapToFieldsDoFn(beam.DoFn):
+ def __init__(self, fields, original_fields, input_schema):
+ self.ctx = None
+ self.field_funcs = {}
+ self.passthrough_fields = []
+
+ script = []
+ for name, expr in fields.items():
+ if isinstance(expr, str) and expr in input_schema:
+ self.passthrough_fields.append((name, expr))
+ continue
+
+ if isinstance(expr, str):
+ expr = {'expression': expr}
+
+ if 'expression' in expr:
+ e = expr['expression']
+ code = f"var func_{name} = (__row__) => {{ " + " ".join(
+ [f"const {n} = __row__.{n};"
+ for n in original_fields if n in e]) + f" return ({e}); }}"
+ script.append(code)
+ self.field_funcs[name] = f"func_{name}"
+ elif 'callable' in expr:
+ code = f"var func_{name} = {expr['callable']}"
+ script.append(code)
+ self.field_funcs[name] = f"func_{name}"
+ elif 'path' in expr and 'name' in expr:
+ path = expr['path']
+ func_name = expr['name']
+ udf_code = FileSystems.open(path).read().decode()
Review Comment:

`FileSystems` is used here but it is not imported in this file. You need to
import it from `apache_beam.io.filesystems`.
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -636,8 +672,17 @@ def _PyJsFilter(
error_handling: Whether and where to output records that throw errors when
the above expressions are evaluated.
""" # pylint: disable=line-too-long
- keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic')
- return pcoll | beam.Filter(keep_fn)
+ if language == 'javascript':
+ if isinstance(keep, str):
+ keep = {'expression': keep}
+ udf_code, function_name = _get_javascript_udf_code(
+ [f.name for f in schema_from_element_type(pcoll.element_type).fields],
Review Comment:

`schema_from_element_type` is used here but it is not imported in this file.
You need to import it from `apache_beam.typehints.schemas`.
##########
sdks/python/setup.py:
##########
@@ -621,8 +621,7 @@ def get_portability_package_data():
'docstring-parser>=0.15,<1.0',
'jinja2>=3.0,<3.2',
'virtualenv-clone>=0.5,<1.0',
- # https://github.com/PiotrDabkowski/Js2Py/issues/317
- 'js2py>=0.74,<1; python_version<"3.12"',
+ 'py-mini-racer',
Review Comment:

It is recommended to specify a version range for `py-mini-racer` to ensure
compatibility and prevent breaking changes from future releases of the package.
```suggestion
'py-mini-racer>=0.6.0',
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -205,85 +193,137 @@ def py_value_to_js_dict(py_value):
return py_value
-# TODO(yaml) Consider adding optional language version parameter to support
-# ECMAScript 5 and 6
-def _expand_javascript_mapping_func(
- original_fields, expression=None, callable=None, path=None, name=None):
+def js_to_py(obj):
+ """Converts mini-racer mapped objects to standard Python types.
+
+ This is needed because ctx.eval returns objects that implement Mapping
+ and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl),
+ which would fail when Beam tries to serialize rows containing them.
+ We also preserve datetime objects which are correctly produced by ctx.eval
+ for JS Date objects.
+ """
+ if isinstance(obj, datetime.datetime):
+ return obj
+ elif isinstance(obj, Mapping):
+ return {k: js_to_py(v) for k, v in obj.items()}
+ elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable):
+ return [js_to_py(v) for v in obj]
+ elif isinstance(obj, str):
+ if _JS_DATE_ISO_REGEX.match(obj):
+ try:
+ return datetime.datetime.fromisoformat(obj[:-1] + '+00:00')
+ except ValueError:
+ return obj
+ return obj
+ else:
+ return obj
- # Check for installed js2py package
- if js2py is None:
- raise ValueError(
- "Javascript mapping functions are not supported on"
- " Python 3.12 or later.")
-
- # import remaining js2py objects
- from js2py import base
- from js2py.constructors import jsdate
- from js2py.internals import simplex
-
- js_array_type = (
- base.PyJsArray,
- base.PyJsArrayBuffer,
- base.PyJsInt8Array,
- base.PyJsUint8Array,
- base.PyJsUint8ClampedArray,
- base.PyJsInt16Array,
- base.PyJsUint16Array,
- base.PyJsInt32Array,
- base.PyJsUint32Array,
- base.PyJsFloat32Array,
- base.PyJsFloat64Array)
-
- def _js_object_to_py_object(obj):
- if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)):
- return base.to_python(obj)
- elif isinstance(obj, js_array_type):
- return [_js_object_to_py_object(value) for value in obj.to_list()]
- elif isinstance(obj, jsdate.PyJsDate):
- return obj.to_utc_dt()
- elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)):
- return None
- elif isinstance(obj, base.PyJsError):
- raise RuntimeError(obj['message'])
- elif isinstance(obj, base.PyJsObject):
- return {
- key: _js_object_to_py_object(value['value'])
- for (key, value) in obj.own.items()
- }
- elif isinstance(obj, base.JsObjectWrapper):
- return _js_object_to_py_object(obj._obj)
- return obj
+class JsFilterDoFn(beam.DoFn):
+ def __init__(self, udf_code, function_name):
+ self.udf_code = udf_code
+ self.function_name = function_name
+ self.ctx = None
+
+ def setup(self):
+ self.ctx = MiniRacer()
+ self.ctx.eval(self.udf_code)
+
+ def process(self, element):
+ row_as_dict = py_value_to_js_dict(element)
+ result = self.ctx.call(self.function_name, row_as_dict)
+ result = js_to_py(result)
+ if result:
+ yield element
+
+
+class JsMapToFieldsDoFn(beam.DoFn):
+ def __init__(self, fields, original_fields, input_schema):
+ self.ctx = None
+ self.field_funcs = {}
+ self.passthrough_fields = []
+
+ script = []
+ for name, expr in fields.items():
+ if isinstance(expr, str) and expr in input_schema:
+ self.passthrough_fields.append((name, expr))
+ continue
+
+ if isinstance(expr, str):
+ expr = {'expression': expr}
+
+ if 'expression' in expr:
+ e = expr['expression']
+ code = f"var func_{name} = (__row__) => {{ " + " ".join(
+ [f"const {n} = __row__.{n};"
+ for n in original_fields if n in e]) + f" return ({e}); }}"
+ script.append(code)
+ self.field_funcs[name] = f"func_{name}"
+ elif 'callable' in expr:
+ code = f"var func_{name} = {expr['callable']}"
+ script.append(code)
+ self.field_funcs[name] = f"func_{name}"
+ elif 'path' in expr and 'name' in expr:
+ path = expr['path']
+ func_name = expr['name']
+ udf_code = FileSystems.open(path).read().decode()
+ script.append(udf_code)
+ self.field_funcs[name] = func_name
+
+ self.script = "\n".join(script) if script else None
+
+ def setup(self):
+ self.ctx = MiniRacer()
+ if self.script:
+ self.ctx.eval(self.script)
+
+ def process(self, element):
+ row_as_dict = py_value_to_js_dict(element)
+ result_dict = {}
+
+ # Handle passthrough fields
+ for name, src in self.passthrough_fields:
+ result_dict[name] = row_as_dict.get(src)
+
+ # Handle JS fields
+ for name, func_name in self.field_funcs.items():
+ res = self.ctx.call(func_name, row_as_dict)
+ result_dict[name] = js_to_py(res)
Review Comment:

Calling `self.ctx.call` for every field in every row is inefficient due to
the overhead of transitioning between Python and the V8 engine. It is highly
recommended to generate a single JavaScript function that computes all fields
and returns a single object, reducing the number of transitions to one per row.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]