Polber commented on code in PR #28114:
URL: https://github.com/apache/beam/pull/28114#discussion_r1316289560
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
Review Comment:
The `__row__` passed to this callable is a `namedtuple` which has a built-in
`_asdict()` method. I updated the code to use this, which also simplified the
functions enough to use a simple lambda.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]