damccorm commented on code in PR #28462:
URL: https://github.com/apache/beam/pull/28462#discussion_r1332072912
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
return self
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
@beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
- pcoll,
- *,
- fields,
- transform_name,
- language,
- keep=None,
- explode=(),
- cross_product=True,
- error_handling=None):
- original_fields = [
- name for (name, _) in named_fields_from_element_type(pcoll.element_type)
- ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+ pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
- if error_handling is None:
- error_handling_args = None
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(keep, str) and keep in input_schema:
+ keep_fn = lambda row: getattr(row, keep)
else:
- error_handling_args = {
- 'dead_letter_tag' if k == 'output' else k: v
- for (k, v) in error_handling.items()
- }
+ keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+ return pcoll | beam.Filter(keep_fn)
- pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
- if keep:
- if isinstance(keep, str) and keep in original_fields:
- keep_fn = lambda row: getattr(row, keep)
- else:
- keep_fn = _as_callable(original_fields, keep, transform_name, language)
- filtered = pcoll | beam.Filter(keep_fn)
- else:
- filtered = pcoll
+def is_expr(v):
+ return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
- projected = filtered | beam.Select(
- **{
- name: _as_callable(original_fields, expr, transform_name, language)
- for (name, expr) in fields.items()
- })
-
- if explode:
- result = projected | _Explode(explode, cross_product=cross_product)
- else:
- result = projected
-
- return result.as_result(
- # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
- beam.Map(
- lambda x: beam.Row(
- element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+ try:
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ except ValueError as exn:
+ if drop:
+ raise ValueError("Can only drop fields on a schema'd input.") from exn
+ if append:
+ raise ValueError("Can only append fields on a schema'd input.") from exn
+ elif any(is_expr(x) for x in fields.values()):
+ raise ValueError("Can only use expressions on a schema'd input.") from
exn
+ input_schema = {}
[email protected]_fn
-def MapToFields(
- pcoll,
- yaml_create_transform,
- *,
- fields,
- keep=None,
- explode=(),
- cross_product=None,
- append=False,
- drop=(),
- language=None,
- error_handling=None,
- transform_name="MapToFields",
- **language_keywords):
- if isinstance(explode, str):
- explode = [explode]
- if cross_product is None:
- if len(explode) > 1:
- # TODO(robertwb): Consider if true is an OK default.
- raise ValueError(
- 'cross_product must be specified true or false '
- 'when exploding multiple fields')
- else:
- # Doesn't matter.
- cross_product = True
-
- input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(drop, str):
+ drop = [drop]
if drop and not append:
raise ValueError("Can only drop fields if append is true.")
for name in drop:
if name not in input_schema:
raise ValueError(f'Dropping unknown field "{name}"')
- for name in explode:
- if not (name in fields or (append and name in input_schema)):
- raise ValueError(f'Exploding unknown field "{name}"')
if append:
for name in fields:
if name in input_schema and name not in drop:
raise ValueError(f'Redefinition of field "{name}"')
+ if language == 'generic':
+ for expr in fields.values():
+ if not isinstance(expr, str):
+ raise ValueError("Missing language specification.")
+ missing = set(fields.values()) - set(input_schema.keys())
+ if missing:
+ raise ValueError(
+ f"Missing language specification or unkown input fields: {missing}")
Review Comment:
```suggestion
f"Unknown input fields: {missing}")
```
We've already dealt with the language specifications, right? (plus a typo)
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
return self
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
@beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
- pcoll,
- *,
- fields,
- transform_name,
- language,
- keep=None,
- explode=(),
- cross_product=True,
- error_handling=None):
- original_fields = [
- name for (name, _) in named_fields_from_element_type(pcoll.element_type)
- ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+ pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
- if error_handling is None:
- error_handling_args = None
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(keep, str) and keep in input_schema:
+ keep_fn = lambda row: getattr(row, keep)
else:
- error_handling_args = {
- 'dead_letter_tag' if k == 'output' else k: v
- for (k, v) in error_handling.items()
- }
+ keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+ return pcoll | beam.Filter(keep_fn)
- pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
- if keep:
- if isinstance(keep, str) and keep in original_fields:
- keep_fn = lambda row: getattr(row, keep)
- else:
- keep_fn = _as_callable(original_fields, keep, transform_name, language)
- filtered = pcoll | beam.Filter(keep_fn)
- else:
- filtered = pcoll
+def is_expr(v):
+ return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
- projected = filtered | beam.Select(
- **{
- name: _as_callable(original_fields, expr, transform_name, language)
- for (name, expr) in fields.items()
- })
-
- if explode:
- result = projected | _Explode(explode, cross_product=cross_product)
- else:
- result = projected
-
- return result.as_result(
- # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
- beam.Map(
- lambda x: beam.Row(
- element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+ try:
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ except ValueError as exn:
+ if drop:
+ raise ValueError("Can only drop fields on a schema'd input.") from exn
+ if append:
+ raise ValueError("Can only append fields on a schema'd input.") from exn
+ elif any(is_expr(x) for x in fields.values()):
+ raise ValueError("Can only use expressions on a schema'd input.") from
exn
+ input_schema = {}
[email protected]_fn
-def MapToFields(
- pcoll,
- yaml_create_transform,
- *,
- fields,
- keep=None,
- explode=(),
- cross_product=None,
- append=False,
- drop=(),
- language=None,
- error_handling=None,
- transform_name="MapToFields",
- **language_keywords):
- if isinstance(explode, str):
- explode = [explode]
- if cross_product is None:
- if len(explode) > 1:
- # TODO(robertwb): Consider if true is an OK default.
- raise ValueError(
- 'cross_product must be specified true or false '
- 'when exploding multiple fields')
- else:
- # Doesn't matter.
- cross_product = True
-
- input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(drop, str):
+ drop = [drop]
if drop and not append:
raise ValueError("Can only drop fields if append is true.")
for name in drop:
if name not in input_schema:
raise ValueError(f'Dropping unknown field "{name}"')
- for name in explode:
- if not (name in fields or (append and name in input_schema)):
- raise ValueError(f'Exploding unknown field "{name}"')
if append:
for name in fields:
if name in input_schema and name not in drop:
raise ValueError(f'Redefinition of field "{name}"')
Review Comment:
Not from this PR, but this error message could be clearer
```suggestion
raise ValueError(f'Redefinition of field "{name}". Cannot append a
field that already exists in original input.')
```
##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
return self
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
@beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
- pcoll,
- *,
- fields,
- transform_name,
- language,
- keep=None,
- explode=(),
- cross_product=True,
- error_handling=None):
- original_fields = [
- name for (name, _) in named_fields_from_element_type(pcoll.element_type)
- ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+ pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
- if error_handling is None:
- error_handling_args = None
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(keep, str) and keep in input_schema:
+ keep_fn = lambda row: getattr(row, keep)
else:
- error_handling_args = {
- 'dead_letter_tag' if k == 'output' else k: v
- for (k, v) in error_handling.items()
- }
+ keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+ return pcoll | beam.Filter(keep_fn)
- pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
- if keep:
- if isinstance(keep, str) and keep in original_fields:
- keep_fn = lambda row: getattr(row, keep)
- else:
- keep_fn = _as_callable(original_fields, keep, transform_name, language)
- filtered = pcoll | beam.Filter(keep_fn)
- else:
- filtered = pcoll
+def is_expr(v):
+ return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
- projected = filtered | beam.Select(
- **{
- name: _as_callable(original_fields, expr, transform_name, language)
- for (name, expr) in fields.items()
- })
-
- if explode:
- result = projected | _Explode(explode, cross_product=cross_product)
- else:
- result = projected
-
- return result.as_result(
- # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
- beam.Map(
- lambda x: beam.Row(
- element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+ try:
+ input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ except ValueError as exn:
+ if drop:
+ raise ValueError("Can only drop fields on a schema'd input.") from exn
+ if append:
+ raise ValueError("Can only append fields on a schema'd input.") from exn
+ elif any(is_expr(x) for x in fields.values()):
+ raise ValueError("Can only use expressions on a schema'd input.") from
exn
+ input_schema = {}
[email protected]_fn
-def MapToFields(
- pcoll,
- yaml_create_transform,
- *,
- fields,
- keep=None,
- explode=(),
- cross_product=None,
- append=False,
- drop=(),
- language=None,
- error_handling=None,
- transform_name="MapToFields",
- **language_keywords):
- if isinstance(explode, str):
- explode = [explode]
- if cross_product is None:
- if len(explode) > 1:
- # TODO(robertwb): Consider if true is an OK default.
- raise ValueError(
- 'cross_product must be specified true or false '
- 'when exploding multiple fields')
- else:
- # Doesn't matter.
- cross_product = True
-
- input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+ if isinstance(drop, str):
+ drop = [drop]
if drop and not append:
raise ValueError("Can only drop fields if append is true.")
for name in drop:
if name not in input_schema:
raise ValueError(f'Dropping unknown field "{name}"')
- for name in explode:
- if not (name in fields or (append and name in input_schema)):
- raise ValueError(f'Exploding unknown field "{name}"')
if append:
for name in fields:
if name in input_schema and name not in drop:
raise ValueError(f'Redefinition of field "{name}"')
+ if language == 'generic':
+ for expr in fields.values():
+ if not isinstance(expr, str):
+ raise ValueError("Missing language specification.")
Review Comment:
This error is probably not clear to an end user. Maybe add something like:
```suggestion
raise ValueError("Missing language specification. Must specify a
language when using a map with custom logic.")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]