gemini-code-assist[bot] commented on code in PR #38512:
URL: https://github.com/apache/beam/pull/38512#discussion_r3250359453


##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
           num_shards=num_shards,
           shard_name_template=shard_name_template,
           compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+    pcoll,
+    *,
+    file_pattern: Optional[str] = None,
+    empty_match_treatment: str = 'ALLOW',
+):
+  """Matches file patterns from the input PCollection.
+
+  This transform returns a PCollection of matching files, each represented as a
+  Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+  Args:
+    file_pattern (str): The name of the field in the input PCollection that 
contains
+      the file pattern string. If not specified and the input PCollection has
+      exactly one field, that field will be used.
+    empty_match_treatment (str): How to treat empty matches. Possible values 
are
+      'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+  """
+  from apache_beam.typehints import schemas
+
+  try:
+    field_names = [
+        name for name, _ in schemas.named_fields_from_element_type(
+            pcoll.element_type)
+    ]
+    if file_pattern is not None:
+      if file_pattern not in field_names:
+        raise ValueError(
+            f"Field '{file_pattern}' not found in input schema fields: 
{field_names}"
+        )
+      pattern_field = file_pattern
+    elif len(field_names) == 1:
+      pattern_field = field_names[0]
+    else:
+      raise ValueError(
+          f"Input schema has multiple fields {field_names}. "
+          f"Please specify the 'file_pattern' parameter to select which field "
+          f"contains the file pattern.")
+    patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+  except Exception:
+    # Fallback for PCollection without a schema (e.g. raw string elements)
+    patterns = pcoll
+
+  matched = patterns | beam.io.fileio.MatchAll(
+      empty_match_treatment=empty_match_treatment)

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   `beam.io.fileio.MatchAll` expects an `EmptyMatchTreatment` enum value for 
the `empty_match_treatment` parameter. Passing a string directly may not work 
as expected because the internal logic in `FileSystems.match` compares against 
enum members. It should be converted using the enum.
   
   ```suggestion
     from apache_beam.io.fileio import EmptyMatchTreatment
     matched = patterns | beam.io.fileio.MatchAll(
         empty_match_treatment=getattr(EmptyMatchTreatment, 
empty_match_treatment))
   ```



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
           num_shards=num_shards,
           shard_name_template=shard_name_template,
           compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+    pcoll,
+    *,
+    file_pattern: Optional[str] = None,
+    empty_match_treatment: str = 'ALLOW',
+):
+  """Matches file patterns from the input PCollection.
+
+  This transform returns a PCollection of matching files, each represented as a
+  Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+  Args:
+    file_pattern (str): The name of the field in the input PCollection that 
contains
+      the file pattern string. If not specified and the input PCollection has
+      exactly one field, that field will be used.
+    empty_match_treatment (str): How to treat empty matches. Possible values 
are
+      'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+  """
+  from apache_beam.typehints import schemas
+
+  try:
+    field_names = [
+        name for name, _ in schemas.named_fields_from_element_type(
+            pcoll.element_type)
+    ]
+    if file_pattern is not None:
+      if file_pattern not in field_names:
+        raise ValueError(
+            f"Field '{file_pattern}' not found in input schema fields: 
{field_names}"
+        )
+      pattern_field = file_pattern
+    elif len(field_names) == 1:
+      pattern_field = field_names[0]
+    else:
+      raise ValueError(
+          f"Input schema has multiple fields {field_names}. "
+          f"Please specify the 'file_pattern' parameter to select which field "
+          f"contains the file pattern.")
+    patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+  except Exception:
+    # Fallback for PCollection without a schema (e.g. raw string elements)
+    patterns = pcoll
+
+  matched = patterns | beam.io.fileio.MatchAll(
+      empty_match_treatment=empty_match_treatment)
+
+  return matched | beam.Map(
+      lambda x: beam.Row(
+          path=str(x.path),
+          size_in_bytes=int(x.size_in_bytes),
+          last_updated_in_seconds=float(x.last_updated_in_seconds), ))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   `float(x.last_updated_in_seconds)` will raise a `TypeError` if 
`last_updated_in_seconds` is `None`. While `MatchAll` usually provides this 
value, some filesystems might not. It is safer to handle the `None` case. 
Additionally, the explicit `str()` and `int()` casts are likely redundant if 
the input `FileMetadata` fields are already of the expected types.
   
   ```suggestion
     return matched | beam.Map(
         lambda x: beam.Row(
             path=str(x.path),
             size_in_bytes=int(x.size_in_bytes),
             last_updated_in_seconds=float(x.last_updated_in_seconds)
             if x.last_updated_in_seconds is not None else None))
   ```



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
           num_shards=num_shards,
           shard_name_template=shard_name_template,
           compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+    pcoll,
+    *,
+    file_pattern: Optional[str] = None,
+    empty_match_treatment: str = 'ALLOW',
+):
+  """Matches file patterns from the input PCollection.
+
+  This transform returns a PCollection of matching files, each represented as a
+  Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+  Args:
+    file_pattern (str): The name of the field in the input PCollection that 
contains
+      the file pattern string. If not specified and the input PCollection has
+      exactly one field, that field will be used.
+    empty_match_treatment (str): How to treat empty matches. Possible values 
are
+      'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+  """
+  from apache_beam.typehints import schemas
+
+  try:
+    field_names = [
+        name for name, _ in schemas.named_fields_from_element_type(
+            pcoll.element_type)
+    ]
+    if file_pattern is not None:
+      if file_pattern not in field_names:
+        raise ValueError(
+            f"Field '{file_pattern}' not found in input schema fields: 
{field_names}"
+        )
+      pattern_field = file_pattern
+    elif len(field_names) == 1:
+      pattern_field = field_names[0]
+    else:
+      raise ValueError(
+          f"Input schema has multiple fields {field_names}. "
+          f"Please specify the 'file_pattern' parameter to select which field "
+          f"contains the file pattern.")
+    patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+  except Exception:
+    # Fallback for PCollection without a schema (e.g. raw string elements)
+    patterns = pcoll

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `try-except` block is too broad. It wraps the entire logic for 
schema-aware PCollections, including explicit `ValueError` raises for missing 
fields. If a user provides an invalid `file_pattern`, the `ValueError` is 
caught and the code falls back to treating the entire element as a pattern 
(line 770), which will likely lead to a confusing error later in `MatchAll` 
(e.g., an `AttributeError` when trying to treat a Row as a string). The 
`try-except` should only cover the schema inspection.
   
   ```python
     try:
       field_names = [
           name for name, _ in schemas.named_fields_from_element_type(
               pcoll.element_type)
       ]
     except Exception:
       field_names = None
   
     if field_names:
       if file_pattern is not None:
         if file_pattern not in field_names:
           raise ValueError(
               f"Field '{file_pattern}' not found in input schema fields: 
{field_names}"
           )
         pattern_field = file_pattern
       elif len(field_names) == 1:
         pattern_field = field_names[0]
       else:
         raise ValueError(
             f"Input schema has multiple fields {field_names}. "
             f"Please specify the 'file_pattern' parameter to select which 
field "
             f"contains the file pattern.")
       patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
     else:
       patterns = pcoll
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to