robertwb commented on code in PR #28114:
URL: https://github.com/apache/beam/pull/28114#discussion_r1313609169
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
+ ', '.join(f'__row__.{param}' for param in original_fields))
+ row_dict = dict(zip(original_fields, row_values))
+ if callable:
Review Comment:
It doesn't feel like we should be doing this branch on every element...
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -89,13 +178,13 @@ def explode_zip(base, fields):
return (
beam.core._MaybePValueWithErrors(
- pcoll, self._exception_handling_args)
+ pcoll, self._exception_handling_args)
| beam.FlatMap(
- lambda row: (
- explode_cross_product if self._cross_product else explode_zip)(
- {name: getattr(row, name) for name in all_fields}, # yapf
- to_explode))
- ).as_result()
+ lambda row: (
Review Comment:
Is yapf wanting this?
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
Review Comment:
`__row__._as_dict()`? (I suppose we need to add this to `Row` objects.
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
+ ', '.join(f'__row__.{param}' for param in original_fields))
+ row_dict = dict(zip(original_fields, row_values))
+ if callable:
+ return js2py.eval_js(udf_code)(row_dict)
+ js = js2py.EvalJs()
+ js.eval(udf_code)
+ return getattr(js, name)(row_dict)
+
+ return fn
+
+
+def _expand_python_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ # Python file UDF case
+ if path and name:
+ if not path.endswith('.py'):
+ raise ValueError(f'File "{path}" is not a valid .py file.')
+ py_file = FileSystems.open(path).read().decode()
+
+ # Parse file into python function using given method name
+ return python_callable.PythonCallableWithSource.load_from_script(
+ py_file, name)
+
+ # Python expression case
+ elif expression:
+ # TODO(robertwb): Consider constructing a single callable that takes
+ ## the row and returns the new row, rather than invoking (and unpacking)
Review Comment:
Remove double hashes.
##########
sdks/python/apache_beam/yaml/yaml_udf_test.py:
##########
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import io
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+from unittest import mock
+
+import apache_beam as beam
+from apache_beam.io import localfilesystem
+from apache_beam.io.filesystem import CompressedFile, CompressionTypes
+from apache_beam.io.gcp import gcsio, gcsfilesystem
+from apache_beam.options import pipeline_options
+from apache_beam.testing.util import equal_to, assert_that
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+MOCK_UDF_FILE_PREFIX = "/path/to/udf"
Review Comment:
Remove.
In fact, I think you can remove all the mocks in this file (e.g. no need to
explicitly test gcs, just trust that Filesystems has its own tests).
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
+ ', '.join(f'__row__.{param}' for param in original_fields))
+ row_dict = dict(zip(original_fields, row_values))
+ if callable:
+ return js2py.eval_js(udf_code)(row_dict)
+ js = js2py.EvalJs()
+ js.eval(udf_code)
+ return getattr(js, name)(row_dict)
+
+ return fn
+
+
+def _expand_python_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ # Python file UDF case
+ if path and name:
+ if not path.endswith('.py'):
+ raise ValueError(f'File "{path}" is not a valid .py file.')
+ py_file = FileSystems.open(path).read().decode()
+
+ # Parse file into python function using given method name
+ return python_callable.PythonCallableWithSource.load_from_script(
+ py_file, name)
+
+ # Python expression case
Review Comment:
These comments are redundant with the code. Generally use comments to
clarify why you're doing something, or if there's something non-obvious going
on. (Similarly elsewhere.)
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
Review Comment:
In one case, udf_code is a script, and in the other, udf_code is a function
definition. I wouldn't assign both to the same variable as they can't be used
interchangeably.
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
+ if not path.endswith('.js'):
+ raise ValueError(f'File "{path}" is not a valid .js file.')
+ udf_code = FileSystems.open(path).read().decode()
+ # Javascript inline UDF case
+ else:
+ udf_code = callable
+
+ def fn(__row__):
+ row_values = eval(
+ ', '.join(f'__row__.{param}' for param in original_fields))
+ row_dict = dict(zip(original_fields, row_values))
+ if callable:
+ return js2py.eval_js(udf_code)(row_dict)
+ js = js2py.EvalJs()
+ js.eval(udf_code)
Review Comment:
I don't think we want to re-evaluate the entire script on every single
element.
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -16,44 +16,133 @@
#
"""This module defines the basic MapToFields operation."""
-
import itertools
import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
from apache_beam.typehints import row_type
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.utils import python_callable
from apache_beam.yaml import yaml_provider
-def _as_callable(original_fields, expr):
+def _check_mapping_arguments(
+ transform_name, expression=None, callable=None, name=None, path=None):
+ # Argument checking
+ if not expression and not callable and not path and not name:
+ raise ValueError(
+ f'{transform_name} must specify either "expression", "callable", or
both "path" and "name"'
+ )
+ if expression and callable:
+ raise ValueError(
+ f'{transform_name} cannot specify both "expression" and "callable"')
+ if (expression or callable) and (path or name):
+ raise ValueError(
+ f'{transform_name} cannot specify "expression" or "callable" with
"path" or "name"'
+ )
+ if path and not name:
+ raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+ if name and not path:
+ raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+
+# TODO(yaml) Consider adding optional language version parameter to support
ECMAScript 5 and 6
+def _expand_javascript_mapping_func(
+ original_fields, expression=None, callable=None, path=None, name=None):
+ try:
+ import js2py
+ except ImportError:
+ raise ImportError(
+ "js2py must be installed to run javascript UDF's for YAML mapping
transforms."
+ )
+
+ # Javascript expression case
+ if expression:
+ parameters = [name for name in original_fields if name in expression]
+ args_from_row = ', '.join(f'__row__.{param}' for param in parameters)
+ args = ', '.join(parameters)
+ js_func = f'function fn({args}) {{return ({expression})}}'
+
+ return lambda __row__: js2py.eval_js(js_func)(*eval(args_from_row))
+
+ # Javascript file UDF case
+ if not callable:
Review Comment:
Prefer
```
if callable:
...
else:
...
```
for readability.
(Possibly the first should be elif, as it's an alternative to expression.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]