robertwb commented on code in PR #28114:
URL: https://github.com/apache/beam/pull/28114#discussion_r1305837506


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')

Review Comment:
   cannot specify both "expression" and "callable"



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [

Review Comment:
   It would be more legible to use a multi-line literal here, or at least 
something like
   
   ['try:',
     '    import js2py',
     'except ImportError:',
     ' raise ImportError(...)',
   ]
   



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +
+                         [f'  {name} = __row__.{name}' for name in parameters] 
+
+                         [
+                             f'  return js2py.eval_js(\'\'\'{js_func}\'\'\')(' 
+
+                             ','.join([name for name in parameters]) + ')'
+                         ])
+    else:
+      # Javascript file UDF case
+      if not callable:
+        _check_file_ext('.js')
+        udf_code = _get_udf_from_file()
+      # Javascript inline UDF case
+      else:
+        udf_code = callable
+
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() + [
+          '  row_dict = {label: getattr(__row__, label) for label in 
__row__._fields}'
+      ] + ([f'  return js2py.eval_js(\'\'\'{udf_code}\'\'\')(row_dict)']
+           if callable else ['  js = js2py.EvalJs()'] +
+           [f'  js.eval(\'\'\'{udf_code}\'\'\')'] +
+           [f'  return getattr(js, "{name}")(row_dict)']))
+  # Python UDF case
   else:
-    # TODO(yaml): support a type parameter
-    # TODO(yaml): support an imports parameter
-    # TODO(yaml): support a requirements parameter (possibly at a higher level)
-    if isinstance(expr, str):
-      expr = {'expression': expr}
-    if not isinstance(expr, dict):
-      raise ValueError(
-          f"Ambiguous expression type (perhaps missing quoting?): {expr}")
-    elif len(expr) != 1:
-      raise ValueError(f"Ambiguous expression type: {list(expr.keys())}")
-    if 'expression' in expr:
+    # Python expression case
+    if expression:
       # TODO(robertwb): Consider constructing a single callable that takes
-      # the row and returns the new row, rather than invoking (and unpacking)
-      # for each field individually.
+      ## the row and returns the new row, rather than invoking (and unpacking)

Review Comment:
   Remove double #.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):

Review Comment:
   Use generic `Filesystems.open` instead. 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L227



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(

Review Comment:
   Perhaps format strings would make this easier to read, e.g.
   
   ```
   args = ', '.join(parameters)
   js_func = f'function fn({args}) {return ({expression)}'
   ```



##########
sdks/python/setup.py:
##########
@@ -325,7 +325,9 @@ def get_portability_package_data():
               'google-cloud-videointelligence>=2.0,<3',
               'google-cloud-vision>=2,<4',
               'google-cloud-recommendations-ai>=0.1.0,<0.11.0',
-              'google-cloud-aiplatform>=1.26.0, < 2.0'
+              'google-cloud-aiplatform>=1.26.0, < 2.0',
+              # GCP Packages required by YAML

Review Comment:
   Do we not need a js2py dependency added as well? (Not to this section, but 
it's where I can comment.)



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +
+                         [f'  {name} = __row__.{name}' for name in parameters] 
+
+                         [
+                             f'  return js2py.eval_js(\'\'\'{js_func}\'\'\')(' 
+

Review Comment:
   Note that you can use " inside ' quoted strings to avoid escaping. We can't 
rule out that js_func won't have (triple) quotes in it, so it should be quoted. 
You can use `repr(js_func)` for this.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +
+                         [f'  {name} = __row__.{name}' for name in parameters] 
+
+                         [
+                             f'  return js2py.eval_js(\'\'\'{js_func}\'\'\')(' 
+
+                             ','.join([name for name in parameters]) + ')'
+                         ])
+    else:
+      # Javascript file UDF case
+      if not callable:
+        _check_file_ext('.js')
+        udf_code = _get_udf_from_file()
+      # Javascript inline UDF case
+      else:
+        udf_code = callable
+
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() + [
+          '  row_dict = {label: getattr(__row__, label) for label in 
__row__._fields}'
+      ] + ([f'  return js2py.eval_js(\'\'\'{udf_code}\'\'\')(row_dict)']
+           if callable else ['  js = js2py.EvalJs()'] +
+           [f'  js.eval(\'\'\'{udf_code}\'\'\')'] +
+           [f'  return getattr(js, "{name}")(row_dict)']))
+  # Python UDF case

Review Comment:
   We should check that the language is python, and have the else statement be 
an error. 
   
   (I wonder if this function is getting large enough that we should break out 
methods `expand_javascript_mapping_func`, `expand_python_mapping_func`, etc.



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -138,19 +248,18 @@ def _PythonProjectionTransform(
     if isinstance(keep, str) and keep in original_fields:
       keep_fn = lambda row: getattr(row, keep)
     else:
-      keep_fn = _as_callable(original_fields, keep)
+      keep_fn = _as_callable(original_fields, keep, transform_name, language)
     filtered = pcoll | beam.Filter(keep_fn)
   else:
     filtered = pcoll
 
-  if list(fields.items()) == [(name, name) for name in original_fields]:
-    projected = filtered
-  else:
-    projected = filtered | beam.Select(
-        **{
-            name: _as_callable(original_fields, expr)
-            for (name, expr) in fields.items()
-        })
+  # TODO(yaml) - Is there a better way to convert to pvalue.Row

Review Comment:
   Yes, but this borrows all the nice type inference properties of Select. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +

Review Comment:
   We should lift the import out of the function definition. (We should 
probably look at lifting the translation to happen once rather than on ever 
call as well. This could be done with
   
   ```args_from_row = ', '.join(f'__row__.{name}' for name in parameters)```
   
   then generating the source
   
   ```
   js_callable = js2py.eval_js({repr(js_func)})
   def fn(__row__):
     return js_callable({args_from_row})
   ```



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +
+                         [f'  {name} = __row__.{name}' for name in parameters] 
+
+                         [
+                             f'  return js2py.eval_js(\'\'\'{js_func}\'\'\')(' 
+
+                             ','.join([name for name in parameters]) + ')'
+                         ])
+    else:
+      # Javascript file UDF case
+      if not callable:
+        _check_file_ext('.js')
+        udf_code = _get_udf_from_file()
+      # Javascript inline UDF case
+      else:
+        udf_code = callable
+
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() + [
+          '  row_dict = {label: getattr(__row__, label) for label in 
__row__._fields}'
+      ] + ([f'  return js2py.eval_js(\'\'\'{udf_code}\'\'\')(row_dict)']
+           if callable else ['  js = js2py.EvalJs()'] +
+           [f'  js.eval(\'\'\'{udf_code}\'\'\')'] +
+           [f'  return getattr(js, "{name}")(row_dict)']))
+  # Python UDF case
   else:
-    # TODO(yaml): support a type parameter
-    # TODO(yaml): support an imports parameter
-    # TODO(yaml): support a requirements parameter (possibly at a higher level)
-    if isinstance(expr, str):
-      expr = {'expression': expr}
-    if not isinstance(expr, dict):
-      raise ValueError(
-          f"Ambiguous expression type (perhaps missing quoting?): {expr}")
-    elif len(expr) != 1:
-      raise ValueError(f"Ambiguous expression type: {list(expr.keys())}")
-    if 'expression' in expr:
+    # Python expression case
+    if expression:
       # TODO(robertwb): Consider constructing a single callable that takes
-      # the row and returns the new row, rather than invoking (and unpacking)
-      # for each field individually.
+      ## the row and returns the new row, rather than invoking (and unpacking)
+      ## for each field individually.
       source = '\n'.join(['def fn(__row__):'] + [
           f'  {name} = __row__.{name}'
-          for name in original_fields if name in expr['expression']
-      ] + ['  return (' + expr['expression'] + ')'])
-    elif 'callable' in expr:
-      source = expr['callable']
+          for name in original_fields if name in expression
+      ] + ['  return (' + expression + ')'])
+
+    # Python file UDF case
+    elif path and name:

Review Comment:
   Put this first, as it exits early. 



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -27,33 +28,140 @@
 from apache_beam.yaml import yaml_provider
 
 
-def _as_callable(original_fields, expr):
-  if expr in original_fields:
-    return expr
+# TODO(yaml) Consider adding optional language version parameter to support 
ECMAScript 5 and 6
+def expand_mapping_func(
+    transform_name,
+    language,
+    original_fields,
+    expression=None,
+    callable=None,
+    path=None,
+    name=None):
+
+  # Argument checking
+  if not expression and not callable and not path and not name:
+    raise ValueError(
+        f'{transform_name} must specify either "expression", "callable", or 
both "path" and "name"'
+    )
+  if expression and callable:
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" and "callable"')
+  if (expression or callable) and (path or name):
+    raise ValueError(
+        f'{transform_name} cannot specify "expression" or "callable" with 
"path" or "name"'
+    )
+  if path and not name:
+    raise ValueError(f'{transform_name} cannot specify "path" without "name"')
+  if name and not path:
+    raise ValueError(f'{transform_name} cannot specify "name" without "path"')
+
+  def _get_udf_from_file():
+    # Local UDF file case
+    if not path.startswith("gs://"):
+      try:
+        with open(path, 'r') as local_udf_file:
+          return local_udf_file.read()
+      except Exception as e:
+        raise IOError(f'Error opening file "{path}": {e}')
+
+    # GCS UDF file case
+    from google.cloud import storage
+
+    # Parse GCS file location
+    gcs_file_parts = str(path[5:]).split('/')
+    gcs_bucket_name = gcs_file_parts[0]
+    gcs_folder = '/'.join(gcs_file_parts[1:])
+
+    # Instantiates a client and downloads file to string
+    storage_client = storage.Client()
+    bucket = storage_client.get_bucket(gcs_bucket_name)
+    blob = bucket.blob(gcs_folder)
+    gcs_file = blob.download_as_string().decode('utf-8')
+
+    return gcs_file
+
+  def _check_file_ext(ext):
+    if not path.endswith(ext):
+      raise ValueError(f'File "{path}" is not a valid {ext} file.')
+
+  def _js2py_import():
+    return (['  try:'] + ['    import js2py'] + ['  except ImportError:'] + [
+        '    raise ImportError("js2py must be installed to run javascript 
UDF\'s for YAML mapping transforms.")'
+    ])
+
+  # Javascript UDF case
+  if language == 'javascript':
+    # Javascript expression case
+    if expression:
+      parameters = [name for name in original_fields if name in expression]
+      js_func = 'function fn(' + ','.join(
+          [name for name in parameters]) + '){' + 'return (' + expression + 
')}'
+      source = '\n'.join(['def fn(__row__):'] + _js2py_import() +

Review Comment:
   Same here. 



##########
sdks/python/apache_beam/yaml/yaml_udf_test.py:
##########
@@ -0,0 +1,325 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+from unittest import mock
+
+import apache_beam as beam
+from apache_beam.options import pipeline_options
+from apache_beam.testing.util import equal_to, assert_that
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+MOCK_UDF_FILE_PREFIX = "/path/to/udf"
+
+
+class YamlUDFMappingTest(unittest.TestCase):
+  def __init__(self, method_name='runYamlMappingTest'):
+    super().__init__(method_name)
+    self.data = [
+        beam.Row(label='11a', conductor=11, rank=0),
+        beam.Row(label='37a', conductor=37, rank=1),
+        beam.Row(label='389a', conductor=389, rank=2),
+    ]
+    self.builtin_open = open  # save the unpatched version
+
+  class MockBlob:
+    def __init__(self, data):
+      self.data = data
+
+    def download_as_string(self):
+      return bytes(self.data, encoding='utf-8')
+
+  class MockBucket:

Review Comment:
   If you use `Filesystems` you won't have to bother making all these mocks as 
it's already well-tested. (You could just do a single local file test with 
tempfile.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to