damccorm commented on code in PR #28462:
URL: https://github.com/apache/beam/pull/28462#discussion_r1332137859


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
     return self
 
 
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
 @beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
-    pcoll,
-    *,
-    fields,
-    transform_name,
-    language,
-    keep=None,
-    explode=(),
-    cross_product=True,
-    error_handling=None):
-  original_fields = [
-      name for (name, _) in named_fields_from_element_type(pcoll.element_type)
-  ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+    pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
 
-  if error_handling is None:
-    error_handling_args = None
+  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(keep, str) and keep in input_schema:
+    keep_fn = lambda row: getattr(row, keep)
   else:
-    error_handling_args = {
-        'dead_letter_tag' if k == 'output' else k: v
-        for (k, v) in error_handling.items()
-    }
+    keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+  return pcoll | beam.Filter(keep_fn)
 
-  pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
 
-  if keep:
-    if isinstance(keep, str) and keep in original_fields:
-      keep_fn = lambda row: getattr(row, keep)
-    else:
-      keep_fn = _as_callable(original_fields, keep, transform_name, language)
-    filtered = pcoll | beam.Filter(keep_fn)
-  else:
-    filtered = pcoll
+def is_expr(v):
+  return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
 
-  projected = filtered | beam.Select(
-      **{
-          name: _as_callable(original_fields, expr, transform_name, language)
-          for (name, expr) in fields.items()
-      })
-
-  if explode:
-    result = projected | _Explode(explode, cross_product=cross_product)
-  else:
-    result = projected
-
-  return result.as_result(
-      # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
-      beam.Map(
-          lambda x: beam.Row(
-              element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
 
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+  try:
+    input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  except ValueError as exn:
+    if drop:
+      raise ValueError("Can only drop fields on a schema'd input.") from exn
+    if append:
+      raise ValueError("Can only append fields on a schema'd input.") from exn
+    elif any(is_expr(x) for x in fields.values()):
+      raise ValueError("Can only use expressions on a schema'd input.") from 
exn
+    input_schema = {}
 
[email protected]_fn
-def MapToFields(
-    pcoll,
-    yaml_create_transform,
-    *,
-    fields,
-    keep=None,
-    explode=(),
-    cross_product=None,
-    append=False,
-    drop=(),
-    language=None,
-    error_handling=None,
-    transform_name="MapToFields",
-    **language_keywords):
-  if isinstance(explode, str):
-    explode = [explode]
-  if cross_product is None:
-    if len(explode) > 1:
-      # TODO(robertwb): Consider if true is an OK default.
-      raise ValueError(
-          'cross_product must be specified true or false '
-          'when exploding multiple fields')
-    else:
-      # Doesn't matter.
-      cross_product = True
-
-  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(drop, str):
+    drop = [drop]
   if drop and not append:
     raise ValueError("Can only drop fields if append is true.")
   for name in drop:
     if name not in input_schema:
       raise ValueError(f'Dropping unknown field "{name}"')
-  for name in explode:
-    if not (name in fields or (append and name in input_schema)):
-      raise ValueError(f'Exploding unknown field "{name}"')
   if append:
     for name in fields:
       if name in input_schema and name not in drop:
         raise ValueError(f'Redefinition of field "{name}"')
 
+  if language == 'generic':
+    for expr in fields.values():
+      if not isinstance(expr, str):
+        raise ValueError("Missing language specification.")
+    missing = set(fields.values()) - set(input_schema.keys())
+    if missing:
+      raise ValueError(
+          f"Missing language specification or unkown input fields: {missing}")

Review Comment:
   Ah, that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to