This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch release-2.60.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.60.0 by this push:
     new 96a96daa011 Report File Lineage on directory (#32662) (#32706)
96a96daa011 is described below

commit 96a96daa011d38fd58ad5dbcd8e6c1478d948c40
Author: Yi Hu <[email protected]>
AuthorDate: Wed Oct 9 09:18:15 2024 -0400

    Report File Lineage on directory (#32662) (#32706)
    
    * Report File Lineage on directory
    
    * added comments, restore lineage assert in TextIOIT
    
    * Report bucket level Lineage for files larger than 100
    
    * fix lint
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 16 +++++-
 .../org/apache/beam/sdk/io/FileBasedSource.java    | 36 +++++++++++++-
 .../java/org/apache/beam/sdk/io/FileSystem.java    | 14 +++++-
 .../java/org/apache/beam/sdk/io/FileSystems.java   | 29 ++++++++++-
 .../sdk/io/ReadAllViaFileBasedSourceTransform.java | 33 ++++++++++++-
 .../sdk/extensions/gcp/storage/GcsFileSystem.java  | 12 ++++-
 .../extensions/gcp/storage/GcsFileSystemTest.java  | 18 +++++++
 .../apache/beam/sdk/io/aws/s3/S3FileSystem.java    | 12 ++++-
 .../beam/sdk/io/aws/s3/S3FileSystemTest.java       | 17 +++++++
 .../apache/beam/sdk/io/aws2/s3/S3FileSystem.java   | 12 ++++-
 .../beam/sdk/io/aws2/s3/S3FileSystemTest.java      | 17 +++++++
 .../azure/blobstore/AzureBlobStoreFileSystem.java  |  7 ++-
 .../blobstore/AzureBlobStoreFileSystemTest.java    | 18 +++++++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 13 +++--
 sdks/python/apache_beam/io/aws/s3filesystem.py     |  8 ++-
 .../python/apache_beam/io/aws/s3filesystem_test.py |  9 ++++
 .../apache_beam/io/azure/blobstoragefilesystem.py  |  9 +++-
 .../io/azure/blobstoragefilesystem_test.py         | 12 +++++
 sdks/python/apache_beam/io/filebasedsink.py        | 24 ++++++++-
 sdks/python/apache_beam/io/filebasedsource.py      | 57 +++++++++++++++++++++-
 sdks/python/apache_beam/io/filesystem.py           |  6 ++-
 sdks/python/apache_beam/io/filesystems.py          | 26 +++++++---
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py    | 10 ++--
 .../apache_beam/io/gcp/gcsfilesystem_test.py       |  9 ++++
 24 files changed, 393 insertions(+), 31 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index b7523ee12b5..7eb04519555 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -687,11 +687,25 @@ public abstract class FileBasedSink<UserT, DestinationT, 
OutputT>
             distinctFilenames.get(finalFilename));
         distinctFilenames.put(finalFilename, result);
         outputFilenames.add(KV.of(result, finalFilename));
-        FileSystems.reportSinkLineage(finalFilename);
       }
+      reportSinkLineage(outputFilenames);
       return outputFilenames;
     }
 
+    /**
+     * Report sink Lineage. Report every file if number of files no more than 
100, otherwise only
+     * report at directory level.
+     */
+    private void reportSinkLineage(List<KV<FileResult<DestinationT>, 
ResourceId>> outputFilenames) {
+      if (outputFilenames.size() <= 100) {
+        for (KV<FileResult<DestinationT>, ResourceId> kv : outputFilenames) {
+          FileSystems.reportSinkLineage(kv.getValue());
+        }
+      } else {
+        
FileSystems.reportSinkLineage(outputFilenames.get(0).getValue().getCurrentDirectory());
+      }
+    }
+
     private Collection<FileResult<DestinationT>> createMissingEmptyShards(
         @Nullable DestinationT dest,
         @Nullable Integer numShards,
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index 7ddfde441ae..8d6e52c64a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -26,10 +26,12 @@ import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.io.FileSystem.LineageLevel;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
@@ -297,9 +299,10 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
           System.currentTimeMillis() - startTime,
           expandedFiles.size(),
           splitResults.size());
+
+      reportSourceLineage(expandedFiles);
       return splitResults;
     } else {
-      FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
       if (isSplittable()) {
         @SuppressWarnings("unchecked")
         List<FileBasedSource<T>> splits =
@@ -315,6 +318,37 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
     }
   }
 
+  /**
+   * Report source Lineage. Due to the size limit of Beam metrics, report full 
file name or only dir
+   * depend on the number of files.
+   *
+   * <p>- Number of files<=100, report full file paths;
+   *
+   * <p>- Number of directory<=100, report directory names (one level up);
+   *
+   * <p>- Otherwise, report top level only.
+   */
+  private static void reportSourceLineage(List<Metadata> expandedFiles) {
+    if (expandedFiles.size() <= 100) {
+      for (Metadata metadata : expandedFiles) {
+        FileSystems.reportSourceLineage(metadata.resourceId());
+      }
+    } else {
+      HashSet<ResourceId> uniqueDirs = new HashSet<>();
+      for (Metadata metadata : expandedFiles) {
+        ResourceId dir = metadata.resourceId().getCurrentDirectory();
+        uniqueDirs.add(dir);
+        if (uniqueDirs.size() > 100) {
+          FileSystems.reportSourceLineage(dir, LineageLevel.TOP_LEVEL);
+          return;
+        }
+      }
+      for (ResourceId uniqueDir : uniqueDirs) {
+        FileSystems.reportSourceLineage(uniqueDir);
+      }
+    }
+  }
+
   /**
    * Determines whether a file represented by this source is can be split into 
bundles.
    *
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 11314a318b2..73caa7284e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -157,10 +157,20 @@ public abstract class FileSystem<ResourceIdT extends 
ResourceId> {
    */
   protected abstract String getScheme();
 
+  public enum LineageLevel {
+    FILE,
+    TOP_LEVEL
+  }
+
+  /** Report {@link Lineage} metrics for resource id at file level. */
+  protected void reportLineage(ResourceIdT resourceId, Lineage lineage) {
+    reportLineage(resourceId, lineage, LineageLevel.FILE);
+  }
+
   /**
-   * Report {@link Lineage} metrics for resource id.
+   * Report {@link Lineage} metrics for resource id to a given level.
    *
    * <p>Unless override by FileSystem implementations, default to no-op.
    */
-  protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {}
+  protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage, 
LineageLevel level) {}
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index a4ca9b80dce..fb25cac6262 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -39,6 +39,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.FileSystem.LineageLevel;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
@@ -398,12 +399,36 @@ public class FileSystems {
 
   /** Report source {@link Lineage} metrics for resource id. */
   public static void reportSourceLineage(ResourceId resourceId) {
-    getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, 
Lineage.getSources());
+    reportSourceLineage(resourceId, LineageLevel.FILE);
   }
 
   /** Report sink {@link Lineage} metrics for resource id. */
   public static void reportSinkLineage(ResourceId resourceId) {
-    getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, 
Lineage.getSinks());
+    reportSinkLineage(resourceId, LineageLevel.FILE);
+  }
+
+  /**
+   * Report source {@link Lineage} metrics for resource id at given level.
+   *
+   * <p>Internal API, no backward compatibility guaranteed.
+   */
+  public static void reportSourceLineage(ResourceId resourceId, LineageLevel 
level) {
+    reportLineage(resourceId, Lineage.getSources(), level);
+  }
+
+  /**
+   * Report source {@link Lineage} metrics for resource id at given level.
+   *
+   * <p>Internal API, no backward compatibility guaranteed.
+   */
+  public static void reportSinkLineage(ResourceId resourceId, LineageLevel 
level) {
+    reportLineage(resourceId, Lineage.getSinks(), level);
+  }
+
+  /** Report {@link Lineage} metrics for resource id at given level to given 
Lineage container. */
+  private static void reportLineage(ResourceId resourceId, Lineage lineage, 
LineageLevel level) {
+    FileSystem fileSystem = getFileSystemInternal(resourceId.getScheme());
+    fileSystem.reportLineage(resourceId, lineage, level);
   }
 
   private static class FilterResult {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
index bbac337f2d0..843deb5cab3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.io;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
+import java.util.HashSet;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileSystem.LineageLevel;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
@@ -30,6 +32,7 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 public abstract class ReadAllViaFileBasedSourceTransform<InT, T>
     extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
@@ -81,6 +84,9 @@ public abstract class ReadAllViaFileBasedSourceTransform<InT, 
T>
       extends DoFn<FileIO.ReadableFile, KV<FileIO.ReadableFile, OffsetRange>> {
     private final long desiredBundleSizeBytes;
 
+    // track unique resourceId met. Access it only inside reportSourceLineage
+    private transient @Nullable HashSet<ResourceId> uniqueIds;
+
     public SplitIntoRangesFn(long desiredBundleSizeBytes) {
       this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     }
@@ -88,6 +94,7 @@ public abstract class ReadAllViaFileBasedSourceTransform<InT, 
T>
     @ProcessElement
     public void process(ProcessContext c) {
       MatchResult.Metadata metadata = c.element().getMetadata();
+      reportSourceLineage(metadata.resourceId());
       if (!metadata.isReadSeekEfficient()) {
         c.output(KV.of(c.element(), new OffsetRange(0, metadata.sizeBytes())));
         return;
@@ -97,6 +104,31 @@ public abstract class 
ReadAllViaFileBasedSourceTransform<InT, T>
         c.output(KV.of(c.element(), range));
       }
     }
+
+    /**
+     * Report source Lineage. Due to the size limit of Beam metrics, report 
full file name or only
+     * top level depend on the number of files.
+     *
+     * <p>- Number of files<=100, report full file paths;
+     *
+     * <p>- Otherwise, report top level only.
+     */
+    @SuppressWarnings("nullness") // only called in processElement, guaranteed 
to be non-null
+    private void reportSourceLineage(ResourceId resourceId) {
+      if (uniqueIds == null) {
+        uniqueIds = new HashSet<>();
+      } else if (uniqueIds.isEmpty()) {
+        // already at capacity
+        FileSystems.reportSourceLineage(resourceId, LineageLevel.TOP_LEVEL);
+        return;
+      }
+      uniqueIds.add(resourceId);
+      FileSystems.reportSourceLineage(resourceId, LineageLevel.FILE);
+      if (uniqueIds.size() >= 100) {
+        // avoid reference leak
+        uniqueIds.clear();
+      }
+    }
   }
 
   public abstract static class AbstractReadFileRangesFn<InT, T>
@@ -140,7 +172,6 @@ public abstract class 
ReadAllViaFileBasedSourceTransform<InT, T>
           throw e;
         }
       }
-      FileSystems.reportSourceLineage(resourceId);
     }
   }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 6332051c0dd..32079ebf55a 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -217,9 +217,19 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
 
   @Override
   protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
+    reportLineage(resourceId, lineage, LineageLevel.FILE);
+  }
+
+  @Override
+  protected void reportLineage(GcsResourceId resourceId, Lineage lineage, 
LineageLevel level) {
     GcsPath path = resourceId.getGcsPath();
     if (!path.getBucket().isEmpty()) {
-      lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
+      ImmutableList.Builder<String> segments =
+          ImmutableList.<String>builder().add(path.getBucket());
+      if (level != LineageLevel.TOP_LEVEL && !path.getObject().isEmpty()) {
+        segments.add(path.getObject());
+      }
+      lineage.add("gcs", segments.build());
     } else {
       LOG.warn("Report Lineage on relative path {} is unsupported", 
path.getObject());
     }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
index 0b79cde1f18..f2ff7118f95 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.api.services.storage.model.Objects;
@@ -38,6 +41,7 @@ import 
org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOExceptio
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -235,6 +239,20 @@ public class GcsFileSystemTest {
         contains(toFilenames(matchResults.get(4)).toArray()));
   }
 
+  @Test
+  public void testReportLineageOnBucket() {
+    verifyLineage("gs://testbucket", ImmutableList.of("testbucket"));
+    verifyLineage("gs://testbucket/", ImmutableList.of("testbucket"));
+    verifyLineage("gs://testbucket/foo/bar.txt", 
ImmutableList.of("testbucket", "foo/bar.txt"));
+  }
+
+  private void verifyLineage(String uri, List<String> expected) {
+    GcsResourceId path = GcsResourceId.fromGcsPath(GcsPath.fromUri(uri));
+    Lineage mockLineage = mock(Lineage.class);
+    gcsFileSystem.reportLineage(path, mockLineage);
+    verify(mockLineage, times(1)).add("gcs", expected);
+  }
+
   private StorageObject createStorageObject(String gcsFilename, long fileSize) 
{
     GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
     // Google APIs will use null for empty files.
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 7ed56efa44b..75d66c46478 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -627,7 +627,17 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
 
   @Override
   protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
-    lineage.add("s3", ImmutableList.of(resourceId.getBucket(), 
resourceId.getKey()));
+    reportLineage(resourceId, lineage, LineageLevel.FILE);
+  }
+
+  @Override
+  protected void reportLineage(S3ResourceId resourceId, Lineage lineage, 
LineageLevel level) {
+    ImmutableList.Builder<String> segments =
+        ImmutableList.<String>builder().add(resourceId.getBucket());
+    if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) {
+      segments.add(resourceId.getKey());
+    }
+    lineage.add("s3", segments.build());
   }
 
   /**
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
index fbef40f4b5c..db749d7080e 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemTest.java
@@ -34,6 +34,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -74,6 +75,7 @@ import java.util.List;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.metrics.Lineage;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -1209,6 +1211,21 @@ public class S3FileSystemTest {
     open.close();
   }
 
+  @Test
+  public void testReportLineageOnBucket() {
+    verifyLineage("s3://testbucket", ImmutableList.of("testbucket"));
+    verifyLineage("s3://testbucket/", ImmutableList.of("testbucket"));
+    verifyLineage("s3://testbucket/foo/bar.txt", 
ImmutableList.of("testbucket", "foo/bar.txt"));
+  }
+
+  private void verifyLineage(String uri, List<String> expected) {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), 
client);
+    S3ResourceId path = S3ResourceId.fromUri(uri);
+    Lineage mockLineage = mock(Lineage.class);
+    s3FileSystem.reportLineage(path, mockLineage);
+    verify(mockLineage, times(1)).add("s3", expected);
+  }
+
   /** A mockito argument matcher to implement equality on 
GetObjectMetadataRequest. */
   private static class GetObjectMetadataRequestMatcher
       implements ArgumentMatcher<GetObjectMetadataRequest> {
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
index 384c8c627ee..e851f8333d0 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
@@ -658,7 +658,17 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
 
   @Override
   protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
-    lineage.add("s3", ImmutableList.of(resourceId.getBucket(), 
resourceId.getKey()));
+    reportLineage(resourceId, lineage, LineageLevel.FILE);
+  }
+
+  @Override
+  protected void reportLineage(S3ResourceId resourceId, Lineage lineage, 
LineageLevel level) {
+    ImmutableList.Builder<String> segments =
+        ImmutableList.<String>builder().add(resourceId.getBucket());
+    if (level != LineageLevel.TOP_LEVEL && !resourceId.getKey().isEmpty()) {
+      segments.add(resourceId.getKey());
+    }
+    lineage.add("s3", segments.build());
   }
 
   /**
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java
index 423176e52a7..39995b8b316 100644
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java
+++ 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemTest.java
@@ -34,6 +34,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -55,6 +56,7 @@ import java.util.List;
 import org.apache.beam.sdk.io.aws2.options.S3Options;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.metrics.Lineage;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -1068,6 +1070,21 @@ public class S3FileSystemTest {
     open.close();
   }
 
+  @Test
+  public void testReportLineageOnBucket() {
+    verifyLineage("s3://testbucket", ImmutableList.of("testbucket"));
+    verifyLineage("s3://testbucket/", ImmutableList.of("testbucket"));
+    verifyLineage("s3://testbucket/foo/bar.txt", 
ImmutableList.of("testbucket", "foo/bar.txt"));
+  }
+
+  private void verifyLineage(String uri, List<String> expected) {
+    S3FileSystem s3FileSystem = buildMockedS3FileSystem(s3Config("mys3"), 
client);
+    S3ResourceId path = S3ResourceId.fromUri(uri);
+    Lineage mockLineage = mock(Lineage.class);
+    s3FileSystem.reportLineage(path, mockLineage);
+    verify(mockLineage, times(1)).add("s3", expected);
+  }
+
   /** A mockito argument matcher to implement equality on 
GetHeadObjectRequest. */
   private static class GetHeadObjectRequestMatcher implements 
ArgumentMatcher<HeadObjectRequest> {
 
diff --git 
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
 
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
index 5137eaf9bb2..bbb2e22d94c 100644
--- 
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
+++ 
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
@@ -453,7 +453,12 @@ class AzureBlobStoreFileSystem extends 
FileSystem<AzfsResourceId> {
 
   @Override
   protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) {
-    if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
+    reportLineage(resourceId, lineage, LineageLevel.FILE);
+  }
+
+  @Override
+  protected void reportLineage(AzfsResourceId resourceId, Lineage lineage, 
LineageLevel level) {
+    if (level != LineageLevel.TOP_LEVEL && 
!Strings.isNullOrEmpty(resourceId.getBlob())) {
       lineage.add(
           "abs",
           ImmutableList.of(
diff --git 
a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java
 
b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java
index 545f314688c..27a2220c2e4 100644
--- 
a/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java
+++ 
b/sdks/java/io/azure/src/test/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,6 +52,7 @@ import java.util.List;
 import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -338,4 +340,20 @@ public class AzureBlobStoreFileSystemTest {
 
     blobContainerClient.delete();
   }
+
+  @Test
+  public void testReportLineageOnBucket() {
+    verifyLineage("azfs://account/container", ImmutableList.of("account", 
"container"));
+    verifyLineage("azfs://account/container/", ImmutableList.of("account", 
"container"));
+    verifyLineage(
+        "azfs://account/container/foo/bar.txt",
+        ImmutableList.of("account", "container", "foo/bar.txt"));
+  }
+
+  private void verifyLineage(String uri, List<String> expected) {
+    AzfsResourceId path = AzfsResourceId.fromUri(uri);
+    Lineage mockLineage = mock(Lineage.class);
+    azureBlobStoreFileSystem.reportLineage(path, mockLineage);
+    verify(mockLineage, times(1)).add("abs", expected);
+  }
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index 859c03ed775..ecdde5cbc8f 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -154,9 +154,16 @@ public class TextIOIT {
 
     PipelineResult result = pipeline.run();
     PipelineResult.State pipelineState = result.waitUntilFinish();
-    assertEquals(
-        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
-        Lineage.query(result.metrics(), Lineage.Type.SINK));
+
+    Set<String> sources = Lineage.query(result.metrics(), Lineage.Type.SOURCE);
+    Set<String> sinks = Lineage.query(result.metrics(), Lineage.Type.SINK);
+    if (numShards <= 100) {
+      // both should be the full files, if supported by the runner
+      assertEquals(sources, sinks);
+    } else {
+      // if supported by runner, both should be non-empty
+      assertEquals(sources.isEmpty(), sinks.isEmpty());
+    }
 
     collectAndPublishMetrics(result);
     // Fail the test if pipeline failed.
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py 
b/sdks/python/apache_beam/io/aws/s3filesystem.py
index e181beac4a5..ffbce5893a9 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem.py
@@ -315,10 +315,14 @@ class S3FileSystem(FileSystem):
     if exceptions:
       raise BeamIOError("Delete operation failed", exceptions)
 
-  def report_lineage(self, path, lineage):
+  def report_lineage(self, path, lineage, level=None):
     try:
-      components = s3io.parse_s3_path(path, get_account=True)
+      components = s3io.parse_s3_path(path, object_optional=True)
     except ValueError:
       # report lineage is fail-safe
       return
+    if level == FileSystem.LineageLevel.TOP_LEVEL or \
+        (len(components) > 1 and components[-1] == ''):
+      # bucket only
+      components = components[:-1]
     lineage.add('s3', *components)
diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py 
b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
index 60e6f319b2c..87403f482bd 100644
--- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py
+++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py
@@ -265,6 +265,15 @@ class S3FileSystemTest(unittest.TestCase):
     src_dest_pairs = list(zip(sources, destinations))
     s3io_mock.rename_files.assert_called_once_with(src_dest_pairs)
 
+  def test_lineage(self):
+    self._verify_lineage("s3://bucket/", ("bucket", ))
+    self._verify_lineage("s3://bucket/foo/bar.txt", ("bucket", "foo/bar.txt"))
+
+  def _verify_lineage(self, uri, expected_segments):
+    lineage_mock = mock.MagicMock()
+    self.fs.report_lineage(uri, lineage_mock)
+    lineage_mock.add.assert_called_once_with("s3", *expected_segments)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py 
b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
index bb56fa09d37..4495245dc54 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
@@ -317,10 +317,15 @@ class BlobStorageFileSystem(FileSystem):
     if exceptions:
       raise BeamIOError("Delete operation failed", exceptions)
 
-  def report_lineage(self, path, lineage):
+  def report_lineage(self, path, lineage, level=None):
     try:
-      components = blobstorageio.parse_azfs_path(path, get_account=True)
+      components = blobstorageio.parse_azfs_path(
+          path, blob_optional=True, get_account=True)
     except ValueError:
       # report lineage is fail-safe
       return
+    if level == FileSystem.LineageLevel.TOP_LEVEL \
+      or(len(components) > 1 and components[-1] == ''):
+      # bucket only
+      components = components[:-1]
     lineage.add('abs', *components)
diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py 
b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
index cee459f5b8a..138fe5f78b2 100644
--- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
+++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py
@@ -320,6 +320,18 @@ class BlobStorageFileSystemTest(unittest.TestCase):
     src_dest_pairs = list(zip(sources, destinations))
     blobstorageio_mock.rename_files.assert_called_once_with(src_dest_pairs)
 
+  def test_lineage(self):
+    self._verify_lineage(
+        "azfs://storageaccount/container/", ("storageaccount", "container"))
+    self._verify_lineage(
+        "azfs://storageaccount/container/foo/bar.txt",
+        ("storageaccount", "container", "foo/bar.txt"))
+
+  def _verify_lineage(self, uri, expected_segments):
+    lineage_mock = mock.MagicMock()
+    self.fs.report_lineage(uri, lineage_mock)
+    lineage_mock.add.assert_called_once_with("abs", *expected_segments)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/io/filebasedsink.py 
b/sdks/python/apache_beam/io/filebasedsink.py
index c708e117c3a..f9d4303c8c7 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -280,9 +280,31 @@ class FileBasedSink(iobase.Sink):
 
       src_files.append(src)
       dst_files.append(dst)
-      FileSystems.report_sink_lineage(dst)
+
+    self._report_sink_lineage(dst_glob, dst_files)
     return src_files, dst_files, delete_files, num_skipped
 
+  def _report_sink_lineage(self, dst_glob, dst_files):
+    """
+    Report sink Lineage. Report every file if number of files no more than 100,
+    otherwise only report at directory level.
+    """
+    if len(dst_files) <= 100:
+      for dst in dst_files:
+        FileSystems.report_sink_lineage(dst)
+    else:
+      dst = dst_glob
+      # dst_glob has a wildcard for shard number (see _shard_name_template)
+      sep = dst_glob.find('*')
+      if sep > 0:
+        dst = dst[:sep]
+      try:
+        dst, _ = FileSystems.split(dst)
+      except ValueError:
+        return  # lineage report is fail-safe
+
+      FileSystems.report_sink_lineage(dst)
+
   @check_accessible(['file_path_prefix'])
   def finalize_write(
       self, init_result, writer_results, unused_pre_finalize_results):
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index efd863810ed..a02bc6de32c 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -39,6 +39,7 @@ from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.io.restriction_trackers import OffsetRange
 from apache_beam.options.value_provider import StaticValueProvider
@@ -168,10 +169,38 @@ class FileBasedSource(iobase.BoundedSource):
             min_bundle_size=self._min_bundle_size,
             splittable=splittable)
         single_file_sources.append(single_file_source)
-        FileSystems.report_source_lineage(file_name)
+
+      self._report_source_lineage(files_metadata)
       self._concat_source = concat_source.ConcatSource(single_file_sources)
+
     return self._concat_source
 
+  def _report_source_lineage(self, files_metadata):
+    """
+    Report source Lineage. depend on the number of files, report full file
+    name, only dir, or only top level
+    """
+    if len(files_metadata) <= 100:
+      for file_metadata in files_metadata:
+        FileSystems.report_source_lineage(file_metadata.path)
+    else:
+      size_track = set()
+      for file_metadata in files_metadata:
+        if len(size_track) >= 100:
+          FileSystems.report_source_lineage(
+              file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL)
+          return
+
+        try:
+          base, _ = FileSystems.split(file_metadata.path)
+        except ValueError:
+          pass
+        else:
+          size_track.add(base)
+
+      for base in size_track:
+        FileSystems.report_source_lineage(base)
+
   def open_file(self, file_name):
     return FileSystems.open(
         file_name,
@@ -343,6 +372,7 @@ class _ExpandIntoRanges(DoFn):
     self._min_bundle_size = min_bundle_size
     self._splittable = splittable
     self._compression_type = compression_type
+    self._size_track = None
 
   def process(self, element: Union[str, FileMetadata], *args,
               **kwargs) -> Iterable[Tuple[FileMetadata, OffsetRange]]:
@@ -352,7 +382,8 @@ class _ExpandIntoRanges(DoFn):
       match_results = FileSystems.match([element])
       metadata_list = match_results[0].metadata_list
     for metadata in metadata_list:
-      FileSystems.report_source_lineage(metadata.path)
+      self._report_source_lineage(metadata.path)
+
       splittable = (
           self._splittable and _determine_splittability_from_compression_type(
               metadata.path, self._compression_type))
@@ -366,6 +397,28 @@ class _ExpandIntoRanges(DoFn):
             metadata,
             OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY))
 
+  def _report_source_lineage(self, path):
+    """
+    Report source Lineage. Due to the size limit of Beam metrics, report full
+    file name or only top level depend on the number of files.
+
+    * Number of files<=100, report full file paths;
+
+    * Otherwise, report top level only.
+    """
+    if self._size_track is None:
+      self._size_track = set()
+    elif len(self._size_track) == 0:
+      FileSystems.report_source_lineage(
+          path, level=FileSystem.LineageLevel.TOP_LEVEL)
+      return
+
+    self._size_track.add(path)
+    FileSystems.report_source_lineage(path)
+
+    if len(self._size_track) >= 100:
+      self._size_track.clear()
+
 
 class _ReadRange(DoFn):
   def __init__(
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index bdc25dcf0fe..840fdf3309e 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -934,7 +934,11 @@ class FileSystem(BeamPlugin, metaclass=abc.ABCMeta):
     """
     raise NotImplementedError
 
-  def report_lineage(self, path, unused_lineage):
+  class LineageLevel:
+    FILE = 'FILE'
+    TOP_LEVEL = 'TOP_LEVEL'
+
+  def report_lineage(self, path, unused_lineage, level=None):
     """
     Report Lineage metrics for path.
 
diff --git a/sdks/python/apache_beam/io/filesystems.py 
b/sdks/python/apache_beam/io/filesystems.py
index ccbeac64076..a32b85332b6 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -391,13 +391,27 @@ class FileSystems(object):
     return filesystem.CHUNK_SIZE
 
   @staticmethod
-  def report_source_lineage(path):
-    """Report source :class:`~apache_beam.metrics.metric.Lineage`."""
+  def report_source_lineage(path, level=None):
+    """
+    Report source :class:`~apache_beam.metrics.metric.Lineage`.
+
+    Args:
+      path: string path to be reported.
+      level: the level of file path. default to
+        :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
+    """
     filesystem = FileSystems.get_filesystem(path)
-    filesystem.report_lineage(path, Lineage.sources())
+    filesystem.report_lineage(path, Lineage.sources(), level=level)
 
   @staticmethod
-  def report_sink_lineage(path):
-    """Report sink :class:`~apache_beam.metrics.metric.Lineage`."""
+  def report_sink_lineage(path, level=None):
+    """
+    Report sink :class:`~apache_beam.metrics.metric.Lineage`.
+
+    Args:
+      path: string path to be reported.
+      level: the level of file path. default to
+        :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE.
+    """
     filesystem = FileSystems.get_filesystem(path)
-    filesystem.report_lineage(path, Lineage.sinks())
+    filesystem.report_lineage(path, Lineage.sinks(), level=level)
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 053b02d325a..325f70ddfd9 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -366,10 +366,14 @@ class GCSFileSystem(FileSystem):
     if exceptions:
       raise BeamIOError("Delete operation failed", exceptions)
 
-  def report_lineage(self, path, lineage):
+  def report_lineage(self, path, lineage, level=None):
     try:
-      bucket, blob = gcsio.parse_gcs_path(path)
+      components = gcsio.parse_gcs_path(path, object_optional=True)
     except ValueError:
       # report lineage is fail-safe
       return
-    lineage.add('gcs', bucket, blob)
+    if level == FileSystem.LineageLevel.TOP_LEVEL \
+      or(len(components) > 1 and components[-1] == ''):
+      # bucket only
+      components = components[:-1]
+    lineage.add('gcs', *components)
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 1206529faf0..ec7fa94b05f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -375,6 +375,15 @@ class GCSFileSystemTest(unittest.TestCase):
       self.fs.delete(files)
     gcsio_mock.delete_batch.assert_called()
 
+  def test_lineage(self):
+    self._verify_lineage("gs://bucket/", ("bucket", ))
+    self._verify_lineage("gs://bucket/foo/bar.txt", ("bucket", "foo/bar.txt"))
+
+  def _verify_lineage(self, uri, expected_segments):
+    lineage_mock = mock.MagicMock()
+    self.fs.report_lineage(uri, lineage_mock)
+    lineage_mock.add.assert_called_once_with("gcs", *expected_segments)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)


Reply via email to