derrickaw commented on code in PR #38512:
URL: https://github.com/apache/beam/pull/38512#discussion_r3250413174


##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -723,3 +723,57 @@ def write_to_tfrecord(
           num_shards=num_shards,
           shard_name_template=shard_name_template,
           compression_type=getattr(CompressionTypes, compression_type))
+
+
[email protected]_fn
+def match_all(
+    pcoll,
+    *,
+    file_pattern: Optional[str] = None,
+    empty_match_treatment: str = 'ALLOW',
+):
+  """Matches file patterns from the input PCollection.
+
+  This transform returns a PCollection of matching files, each represented as a
+  Row with path, size_in_bytes, and last_updated_in_seconds fields.
+
+  Args:
+    file_pattern (str): The name of the field in the input PCollection that 
contains
+      the file pattern string. If not specified and the input PCollection has
+      exactly one field, that field will be used.
+    empty_match_treatment (str): How to treat empty matches. Possible values 
are
+      'ALLOW', 'DISALLOW', and 'ALLOW_IF_WILDCARD'. Defaults to 'ALLOW'.
+  """
+  from apache_beam.typehints import schemas
+
+  try:
+    field_names = [
+        name for name, _ in schemas.named_fields_from_element_type(
+            pcoll.element_type)
+    ]
+    if file_pattern is not None:
+      if file_pattern not in field_names:
+        raise ValueError(
+            f"Field '{file_pattern}' not found in input schema fields: 
{field_names}"
+        )
+      pattern_field = file_pattern
+    elif len(field_names) == 1:
+      pattern_field = field_names[0]
+    else:
+      raise ValueError(
+          f"Input schema has multiple fields {field_names}. "
+          f"Please specify the 'file_pattern' parameter to select which field "
+          f"contains the file pattern.")
+    patterns = pcoll | beam.Map(lambda x: str(getattr(x, pattern_field)))
+  except Exception:
+    # Fallback for PCollection without a schema (e.g. raw string elements)
+    patterns = pcoll
+
+  matched = patterns | beam.io.fileio.MatchAll(
+      empty_match_treatment=empty_match_treatment)
+
+  return matched | beam.Map(
+      lambda x: beam.Row(
+          path=str(x.path),
+          size_in_bytes=int(x.size_in_bytes),
+          last_updated_in_seconds=float(x.last_updated_in_seconds), ))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to