gemini-code-assist[bot] commented on code in PR #38512:
URL: https://github.com/apache/beam/pull/38512#discussion_r3250359453
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
num_shards=num_shards,
shard_name_template=shard_name_template,
compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+ pcoll,
+ *,
+ file_pattern: Optional[str] = None,
+ empty_match_treatment: str = 'ALLOW',
+):
+ """Matches file patterns from the input PCollection.
+
+ This transform returns a PCollection of matching files, each represented as a
+ Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+ Args:
+ file_pattern (str): The name of the field in the input PCollection that
contains
+ the file pattern string. If not specified and the input PCollection has
+ exactly one field, that field will be used.
+ empty_match_treatment (str): How to treat empty matches. Possible values
are
+ 'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+ """
+ from apache_beam.typehints import schemas
+
+ try:
+ field_names = [
+ name for name, _ in schemas.named_fields_from_element_type(
+ pcoll.element_type)
+ ]
+ if file_pattern is not None:
+ if file_pattern not in field_names:
+ raise ValueError(
+ f"Field '{file_pattern}' not found in input schema fields:
{field_names}"
+ )
+ pattern_field = file_pattern
+ elif len(field_names) == 1:
+ pattern_field = field_names[0]
+ else:
+ raise ValueError(
+ f"Input schema has multiple fields {field_names}. "
+ f"Please specify the 'file_pattern' parameter to select which field "
+ f"contains the file pattern.")
+ patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+ except Exception:
+ # Fallback for PCollection without a schema (e.g. raw string elements)
+ patterns = pcoll
+
+ matched = patterns | beam.io.fileio.MatchAll(
+ empty_match_treatment=empty_match_treatment)
Review Comment:

`beam.io.fileio.MatchAll` expects an `EmptyMatchTreatment` enum value for
the `empty_match_treatment` parameter. Passing a string directly may not work
as expected because the internal logic in `FileSystems.match` compares against
enum members. It should be converted using the enum.
```suggestion
from apache_beam.io.fileio import EmptyMatchTreatment
matched = patterns | beam.io.fileio.MatchAll(
empty_match_treatment=getattr(EmptyMatchTreatment,
empty_match_treatment))
```
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
num_shards=num_shards,
shard_name_template=shard_name_template,
compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+ pcoll,
+ *,
+ file_pattern: Optional[str] = None,
+ empty_match_treatment: str = 'ALLOW',
+):
+ """Matches file patterns from the input PCollection.
+
+ This transform returns a PCollection of matching files, each represented as a
+ Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+ Args:
+ file_pattern (str): The name of the field in the input PCollection that
contains
+ the file pattern string. If not specified and the input PCollection has
+ exactly one field, that field will be used.
+ empty_match_treatment (str): How to treat empty matches. Possible values
are
+ 'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+ """
+ from apache_beam.typehints import schemas
+
+ try:
+ field_names = [
+ name for name, _ in schemas.named_fields_from_element_type(
+ pcoll.element_type)
+ ]
+ if file_pattern is not None:
+ if file_pattern not in field_names:
+ raise ValueError(
+ f"Field '{file_pattern}' not found in input schema fields:
{field_names}"
+ )
+ pattern_field = file_pattern
+ elif len(field_names) == 1:
+ pattern_field = field_names[0]
+ else:
+ raise ValueError(
+ f"Input schema has multiple fields {field_names}. "
+ f"Please specify the 'file_pattern' parameter to select which field "
+ f"contains the file pattern.")
+ patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+ except Exception:
+ # Fallback for PCollection without a schema (e.g. raw string elements)
+ patterns = pcoll
+
+ matched = patterns | beam.io.fileio.MatchAll(
+ empty_match_treatment=empty_match_treatment)
+
+ return matched | beam.Map(
+ lambda x: beam.Row(
+ path=str(x.path),
+ size_in_bytes=int(x.size_in_bytes),
+ last_updated_in_seconds=float(x.last_updated_in_seconds), ))
Review Comment:

`float(x.last_updated_in_seconds)` will raise a `TypeError` if
`last_updated_in_seconds` is `None`. While `MatchAll` usually provides this
value, some filesystems might not. It is safer to handle the `None` case.
Additionally, the explicit `str()` and `int()` casts are likely redundant if
the input `FileMetadata` fields are already of the expected types.
```suggestion
return matched | beam.Map(
lambda x: beam.Row(
path=str(x.path),
size_in_bytes=int(x.size_in_bytes),
last_updated_in_seconds=float(x.last_updated_in_seconds)
if x.last_updated_in_seconds is not None else None))
```
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
num_shards=num_shards,
shard_name_template=shard_name_template,
compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+ pcoll,
+ *,
+ file_pattern: Optional[str] = None,
+ empty_match_treatment: str = 'ALLOW',
+):
+ """Matches file patterns from the input PCollection.
+
+ This transform returns a PCollection of matching files, each represented as a
+ Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+ Args:
+ file_pattern (str): The name of the field in the input PCollection that
contains
+ the file pattern string. If not specified and the input PCollection has
+ exactly one field, that field will be used.
+ empty_match_treatment (str): How to treat empty matches. Possible values
are
+ 'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+ """
+ from apache_beam.typehints import schemas
+
+ try:
+ field_names = [
+ name for name, _ in schemas.named_fields_from_element_type(
+ pcoll.element_type)
+ ]
+ if file_pattern is not None:
+ if file_pattern not in field_names:
+ raise ValueError(
+ f"Field '{file_pattern}' not found in input schema fields:
{field_names}"
+ )
+ pattern_field = file_pattern
+ elif len(field_names) == 1:
+ pattern_field = field_names[0]
+ else:
+ raise ValueError(
+ f"Input schema has multiple fields {field_names}. "
+ f"Please specify the 'file_pattern' parameter to select which field "
+ f"contains the file pattern.")
+ patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+ except Exception:
+ # Fallback for PCollection without a schema (e.g. raw string elements)
+ patterns = pcoll
Review Comment:

The `try-except` block is too broad. It wraps the entire logic for
schema-aware PCollections, including explicit `ValueError` raises for missing
fields. If a user provides an invalid `file_pattern`, the `ValueError` is
caught and the code falls back to treating the entire element as a pattern
(line 770), which will likely lead to a confusing error later in `MatchAll`
(e.g., an `AttributeError` when trying to treat a Row as a string). The
`try-except` should only cover the schema inspection.
```python
try:
field_names = [
name for name, _ in schemas.named_fields_from_element_type(
pcoll.element_type)
]
except Exception:
field_names = None
if field_names:
if file_pattern is not None:
if file_pattern not in field_names:
raise ValueError(
f"Field '{file_pattern}' not found in input schema fields:
{field_names}"
)
pattern_field = file_pattern
elif len(field_names) == 1:
pattern_field = field_names[0]
else:
raise ValueError(
f"Input schema has multiple fields {field_names}. "
f"Please specify the 'file_pattern' parameter to select which
field "
f"contains the file pattern.")
patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
else:
patterns = pcoll
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]