This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c492434773  Add Lineage metrics to FileSystems (#32090)
2c492434773 is described below

commit 2c4924347732473df4dedbd3a93db9c8326fb477
Author: Yi Hu <[email protected]>
AuthorDate: Wed Aug 14 12:50:12 2024 -0400

     Add Lineage metrics to FileSystems (#32090)
    
    * Add Lineage metrics to FileSystems
    
    * Handle reserved characters (colon, dot, white space) in Lineage FQN
    
    * address comments - align up
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |  1 +
 .../org/apache/beam/sdk/io/FileBasedSource.java    |  1 +
 .../java/org/apache/beam/sdk/io/FileSystem.java    |  8 ++
 .../java/org/apache/beam/sdk/io/FileSystems.java   | 11 +++
 .../sdk/io/ReadAllViaFileBasedSourceTransform.java |  5 +-
 .../java/org/apache/beam/sdk/metrics/Lineage.java  | 97 ++++++++++++++++++++--
 .../org/apache/beam/sdk/metrics/LineageTest.java   | 59 +++++++++++++
 .../sdk/extensions/gcp/storage/GcsFileSystem.java  | 11 +++
 .../apache/beam/sdk/io/aws/s3/S3FileSystem.java    |  6 ++
 .../apache/beam/sdk/io/aws2/s3/S3FileSystem.java   |  6 ++
 .../azure/blobstore/AzureBlobStoreFileSystem.java  | 13 +++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  5 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  5 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    |  3 +-
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java |  7 +-
 .../beam/sdk/io/gcp/bigquery/CreateTables.java     |  3 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  3 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |  3 +-
 .../beam/sdk/io/gcp/bigquery/WriteRename.java      |  3 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java      |  3 +-
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   |  6 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java       | 15 ++--
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java    |  7 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java     |  7 +-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java   |  4 +-
 .../beam/sdk/io/gcp/bigtable/BigtableReadIT.java   |  5 +-
 .../beam/sdk/io/gcp/bigtable/BigtableWriteIT.java  |  4 +-
 .../beam/sdk/io/gcp/pubsub/PubsubClientTest.java   |  5 +-
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java       |  4 +-
 29 files changed, 265 insertions(+), 45 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 34b4246a708..b7523ee12b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -687,6 +687,7 @@ public abstract class FileBasedSink<UserT, DestinationT, 
OutputT>
             distinctFilenames.get(finalFilename));
         distinctFilenames.put(finalFilename, result);
         outputFilenames.add(KV.of(result, finalFilename));
+        FileSystems.reportSinkLineage(finalFilename);
       }
       return outputFilenames;
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d68338eceaf..7ddfde441ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -299,6 +299,7 @@ public abstract class FileBasedSource<T> extends 
OffsetBasedSource<T> {
           splitResults.size());
       return splitResults;
     } else {
+      FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
       if (isSplittable()) {
         @SuppressWarnings("unchecked")
         List<FileBasedSource<T>> splits =
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 501cd72dadd..11314a318b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.metrics.Lineage;
 
 /**
  * File system interface in Beam.
@@ -155,4 +156,11 @@ public abstract class FileSystem<ResourceIdT extends 
ResourceId> {
    * @see <a href="https://www.ietf.org/rfc/rfc2396.txt";>RFC 2396</a>
    */
   protected abstract String getScheme();
+
+  /**
+   * Report {@link Lineage} metrics for resource id.
+   *
+   * <p>Unless override by FileSystem implementations, default to no-op.
+   */
+  protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {}
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index c58ce9ad830..a4ca9b80dce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;
@@ -395,6 +396,16 @@ public class FileSystems {
         .delete(resourceIdsToDelete);
   }
 
+  /** Report source {@link Lineage} metrics for resource id. */
+  public static void reportSourceLineage(ResourceId resourceId) {
+    getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, 
Lineage.getSources());
+  }
+
+  /** Report sink {@link Lineage} metrics for resource id. */
+  public static void reportSinkLineage(ResourceId resourceId) {
+    getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId, 
Lineage.getSinks());
+  }
+
   private static class FilterResult {
     public List<ResourceId> resultSources = new ArrayList();
     public List<ResourceId> resultDestinations = new ArrayList();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
index 74680ab7086..bbac337f2d0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
@@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -123,8 +124,9 @@ public abstract class 
ReadAllViaFileBasedSourceTransform<InT, T>
     public void process(ProcessContext c) throws IOException {
       FileIO.ReadableFile file = c.element().getKey();
       OffsetRange range = c.element().getValue();
+      ResourceId resourceId = file.getMetadata().resourceId();
       FileBasedSource<InT> source =
-          
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
+          CompressedSource.from(createSource.apply(resourceId.toString()))
               .withCompression(file.getCompression());
       try (BoundedSource.BoundedReader<InT> reader =
           source
@@ -138,6 +140,7 @@ public abstract class 
ReadAllViaFileBasedSourceTransform<InT, T>
           throw e;
         }
       }
+      FileSystems.reportSourceLineage(resourceId);
     }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 6166a562bf2..302ae4f2fef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -19,26 +19,109 @@ package org.apache.beam.sdk.metrics;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * Standard collection of metrics used to record source and sinks information 
for lineage tracking.
  */
 public class Lineage {
+
   public static final String LINEAGE_NAMESPACE = "lineage";
-  private static final StringSet SOURCES =
-      Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
-  private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, 
Type.SINK.toString());
+  private static final Lineage SOURCES = new Lineage(Type.SOURCE);
+  private static final Lineage SINKS = new Lineage(Type.SINK);
+  private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]");
+
+  private final StringSet metric;
 
-  /** {@link StringSet} representing sources and optionally side inputs. */
-  public static StringSet getSources() {
+  private Lineage(Type type) {
+    this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
+  }
+
+  /** {@link Lineage} representing sources and optionally side inputs. */
+  public static Lineage getSources() {
     return SOURCES;
   }
 
-  /** {@link StringSet} representing sinks. */
-  public static StringSet getSinks() {
+  /** {@link Lineage} representing sinks. */
+  public static Lineage getSinks() {
     return SINKS;
   }
 
+  /**
+   * Wrap segment to valid segment name.
+   *
+   * <p>Specifically, If there are reserved chars (colon, whitespace, dot), 
escape with backtick. If
+   * the segment is already wrapped, return the original.
+   */
+  private static String wrapSegment(String value) {
+    if (value.startsWith("`") && value.endsWith("`")) {
+      return value;
+    }
+    if (RESERVED_CHARS.matcher(value).find()) {
+      return String.format("`%s`", value);
+    }
+    return value;
+  }
+
+  /**
+   * Assemble fully qualified name (<a
+   * 
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names";>FQN</a>).
 Format:
+   *
+   * <ul>
+   *   <li>{@code system:segment1.segment2}
+   *   <li>{@code system:routine:segment1.segment2}
+   *   <li>{@code system:`segment1.with.dots:clons`.segment2}
+   * </ul>
+   *
+   * <p>This helper method is for internal and testing usage only.
+   */
+  @Internal
+  public static String getFqName(
+      String system, @Nullable String routine, Iterable<String> segments) {
+    StringBuilder builder = new StringBuilder(system);
+    if (!Strings.isNullOrEmpty(routine)) {
+      builder.append(":").append(routine);
+    }
+    int idx = 0;
+    for (String segment : segments) {
+      if (idx == 0) {
+        builder.append(":");
+      } else {
+        builder.append(".");
+      }
+      builder.append(wrapSegment(segment));
+      ++idx;
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Assemble the FQN of given system, and segments.
+   *
+   * <p>This helper method is for internal and testing usage only.
+   */
+  @Internal
+  public static String getFqName(String system, Iterable<String> segments) {
+    return getFqName(system, null, segments);
+  }
+
+  /**
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   */
+  public void add(String system, @Nullable String routine, Iterable<String> 
segments) {
+    metric.add(getFqName(system, routine, segments));
+  }
+
+  /**
+   * Add a FQN (fully-qualified name) to Lineage. Segments will be processed 
via {@link #getFqName}.
+   */
+  public void add(String system, Iterable<String> segments) {
+    add(system, null, segments);
+  }
+
   /** Query {@link StringSet} metrics from {@link MetricResults}. */
   public static Set<String> query(MetricResults results, Type type) {
     MetricsFilter filter =
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
new file mode 100644
index 00000000000..432eb396fe2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Lineage}. */
+@RunWith(JUnit4.class)
+public class LineageTest {
+  @Test
+  public void testGetFqName() {
+    Map<String, String> testCases =
+        ImmutableMap.<String, String>builder()
+            .put("apache-beam", "apache-beam")
+            .put("`apache-beam`", "`apache-beam`")
+            .put("apache.beam", "`apache.beam`")
+            .put("apache:beam", "`apache:beam`")
+            .put("apache beam", "`apache beam`")
+            .put("`apache beam`", "`apache beam`")
+            .put("apache\tbeam", "`apache\tbeam`")
+            .put("apache\nbeam", "`apache\nbeam`")
+            .build();
+    testCases.forEach(
+        (key, value) ->
+            assertEquals("apache:" + value, Lineage.getFqName("apache", 
ImmutableList.of(key))));
+    testCases.forEach(
+        (key, value) ->
+            assertEquals(
+                "apache:beam:" + value,
+                Lineage.getFqName("apache", "beam", ImmutableList.of(key))));
+    testCases.forEach(
+        (key, value) ->
+            assertEquals(
+                "apache:beam:" + value + "." + value,
+                Lineage.getFqName("apache", "beam", ImmutableList.of(key, 
key))));
+  }
+}
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index b49c434f81c..6332051c0dd 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
@@ -214,6 +215,16 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
     return "gs";
   }
 
+  @Override
+  protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
+    GcsPath path = resourceId.getGcsPath();
+    if (!path.getBucket().isEmpty()) {
+      lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
+    } else {
+      LOG.warn("Report Lineage on relative path {} is unsupported", 
path.getObject());
+    }
+  }
+
   private List<MatchResult> matchGlobs(List<GcsPath> globs) {
     // TODO: Executes in parallel, address 
https://issues.apache.org/jira/browse/BEAM-1503.
     return FluentIterable.from(globs)
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index fa442374e3c..7ed56efa44b 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.util.MoreFutures;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -624,6 +625,11 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     return S3ResourceId.fromUri(singleResourceSpec);
   }
 
+  @Override
+  protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+    lineage.add("s3", ImmutableList.of(resourceId.getBucket(), 
resourceId.getKey()));
+  }
+
   /**
    * Invokes tasks in a thread pool, then unwraps the resulting {@link Future 
Futures}.
    *
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
index 5f08600758f..384c8c627ee 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.aws2.options.S3Options;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.util.MoreFutures;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -655,6 +656,11 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     return S3ResourceId.fromUri(singleResourceSpec);
   }
 
+  @Override
+  protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+    lineage.add("s3", ImmutableList.of(resourceId.getBucket(), 
resourceId.getKey()));
+  }
+
   /**
    * Invokes tasks in a thread pool, then unwraps the resulting {@link Future 
Futures}.
    *
diff --git 
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
 
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
index 3b6e79b4ef7..5137eaf9bb2 100644
--- 
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
+++ 
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -449,4 +450,16 @@ class AzureBlobStoreFileSystem extends 
FileSystem<AzfsResourceId> {
     }
     return AzfsResourceId.fromUri(singleResourceSpec);
   }
+
+  @Override
+  protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) {
+    if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
+      lineage.add(
+          "abs",
+          ImmutableList.of(
+              resourceId.getAccount(), resourceId.getContainer(), 
resourceId.getBlob()));
+    } else {
+      lineage.add("abs", ImmutableList.of(resourceId.getAccount(), 
resourceId.getContainer()));
+    }
+  }
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index b852020c9bb..859c03ed775 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.text;
 import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
 import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static 
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import java.time.Instant;
@@ -36,6 +37,7 @@ import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
 import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
 import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
 import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testutils.NamedTestResult;
@@ -152,6 +154,9 @@ public class TextIOIT {
 
     PipelineResult result = pipeline.run();
     PipelineResult.State pipelineState = result.waitUntilFinish();
+    assertEquals(
+        Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+        Lineage.query(result.metrics(), Lineage.Type.SINK));
 
     collectAndPublishMetrics(result);
     // Fail the test if pipeline failed.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 61bed66a336..129c8314fc8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -413,7 +414,7 @@ public class BigQueryHelpers {
     return sb.toString();
   }
 
-  public static String dataCatalogName(TableReference ref, BigQueryOptions 
options) {
+  public static List<String> dataCatalogSegments(TableReference ref, 
BigQueryOptions options) {
     String tableIdBase;
     int ix = ref.getTableId().indexOf('$');
     if (ix == -1) {
@@ -429,7 +430,7 @@ public class BigQueryHelpers {
     } else {
       projectId = options.getProject();
     }
-    return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), 
tableIdBase);
+    return ImmutableList.of(projectId, ref.getDatasetId(), tableIdBase);
   }
 
   static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 998c82ab8d8..a8985775cbe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,8 @@ abstract class BigQuerySourceBase<T> extends 
BoundedSource<T> {
                 BigQueryHelpers.toTableSpec(tableToExtract)));
       }
       // emit this table ID as a lineage source
-      Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, 
bqOptions));
+      Lineage.getSources()
+          .add("bigquery", BigQueryHelpers.dataCatalogSegments(tableToExtract, 
bqOptions));
 
       TableSchema schema = table.getSchema();
       JobService jobService = bqServices.getJobService(bqOptions);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index 3852d18ec12..51a5a8f391a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -35,7 +35,6 @@ import 
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
 import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.StringSet;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -109,12 +108,12 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
     @Nullable Table targetTable = getTargetTable(bqOptions);
 
     ReadSession.Builder readSessionBuilder = ReadSession.newBuilder();
-    StringSet lineageSources = Lineage.getSources();
+    Lineage lineage = Lineage.getSources();
     if (targetTable != null) {
       TableReference tableReference = targetTable.getTableReference();
       
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
       // register the table as lineage source
-      lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, 
bqOptions));
+      lineage.add("bigquery", 
BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
     } else {
       // If the table does not exist targetTable will be null.
       // Construct the table id if we can generate it. For error 
recording/logging.
@@ -123,7 +122,7 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
         readSessionBuilder.setTable(tableReferenceId);
         // register the table as lineage source
         TableReference tableReference = 
BigQueryHelpers.parseTableUrn(tableReferenceId);
-        lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, 
bqOptions));
+        lineage.add("bigquery", 
BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
       }
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a55bcc3fe02..1bbd4e75608 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -121,7 +121,8 @@ public class CreateTables<DestinationT, ElementT>
                 BigQueryOptions bqOptions = 
context.getPipelineOptions().as(BigQueryOptions.class);
                 Lineage.getSinks()
                     .add(
-                        BigQueryHelpers.dataCatalogName(
+                        "bigquery",
+                        BigQueryHelpers.dataCatalogSegments(
                             tableDestination1.getTableReference(), bqOptions));
                 return CreateTableHelpers.possiblyCreateTable(
                     bqOptions,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 369bb2d7863..13a22cbfe90 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -1132,7 +1132,8 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                           pipelineOptions.as(BigQueryOptions.class)));
       Lineage.getSinks()
           .add(
-              BigQueryHelpers.dataCatalogName(
+              "bigquery",
+              BigQueryHelpers.dataCatalogSegments(
                   state.getTableDestination().getTableReference(),
                   pipelineOptions.as(BigQueryOptions.class)));
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index f3f512110b5..1ee001d9890 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -477,7 +477,8 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
 
       Lineage.getSinks()
           .add(
-              BigQueryHelpers.dataCatalogName(
+              "bigquery",
+              BigQueryHelpers.dataCatalogSegments(
                   tableDestination.getTableReference(), bigQueryOptions));
 
       Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 1a6a6a4db70..061e66024e2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -210,7 +210,8 @@ class WriteRename
         if (!entry.getValue().isEmpty()) {
           Lineage.getSinks()
               .add(
-                  BigQueryHelpers.dataCatalogName(
+                  "bigquery",
+                  BigQueryHelpers.dataCatalogSegments(
                       entry.getKey().getTableReference(),
                       c.getPipelineOptions().as(BigQueryOptions.class)));
           pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), 
c, window));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index ace0bc5a74c..e374d459af4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -263,7 +263,8 @@ class WriteTables<DestinationT extends @NonNull Object>
       } else {
         Lineage.getSinks()
             .add(
-                BigQueryHelpers.dataCatalogName(
+                "bigquery",
+                BigQueryHelpers.dataCatalogSegments(
                     tableReference, 
c.getPipelineOptions().as(BigQueryOptions.class)));
       }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 6fdf67722ba..1af9ae4f932 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -216,7 +216,7 @@ class BigtableServiceImpl implements BigtableService {
 
     @Override
     public void reportLineage() {
-      Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+      Lineage.getSources().add("bigtable", ImmutableList.of(projectId, 
instanceId, tableId));
     }
   }
 
@@ -327,7 +327,7 @@ class BigtableServiceImpl implements BigtableService {
 
     @Override
     public void reportLineage() {
-      Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+      Lineage.getSources().add("bigtable", ImmutableList.of(projectId, 
instanceId, tableId));
     }
 
     @Override
@@ -597,7 +597,7 @@ class BigtableServiceImpl implements BigtableService {
 
     @Override
     public void reportLineage() {
-      Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+      Lineage.getSinks().add("bigtable", ImmutableList.of(projectId, 
instanceId, tableId));
     }
 
     private ServiceCallMetric createServiceCallMetric() {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f66ee6e1d84..2964a29dbb6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -260,8 +261,8 @@ public abstract class PubsubClient implements Closeable {
       return String.format("/subscriptions/%s/%s", projectId, 
subscriptionName);
     }
 
-    public String getDataCatalogName() {
-      return String.format("pubsub:subscription:%s.%s", projectId, 
subscriptionName);
+    public List<String> getDataCatalogSegments() {
+      return ImmutableList.of(projectId, subscriptionName);
     }
 
     @Override
@@ -319,14 +320,14 @@ public abstract class PubsubClient implements Closeable {
     }
 
     /**
-     * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`" 
This method is
-     * fail-safe. If topic path is malformed, it returns an empty string.
+     * Returns the data catalog segments. This method is fail-safe. If topic 
path is malformed, it
+     * returns an empty string.
      */
-    public String getDataCatalogName() {
+    public List<String> getDataCatalogSegments() {
       List<String> splits = Splitter.on('/').splitToList(path);
       if (splits.size() == 4) {
         // well-formed path
-        return String.format("pubsub:topic:%s.%s", splits.get(1), 
splits.get(3));
+        return ImmutableList.of(splits.get(1), splits.get(3));
       } else {
         // Mal-formed path. It is either a test fixture or user error and will 
fail on publish.
         // We do not throw exception instead return empty string here.
@@ -334,7 +335,7 @@ public abstract class PubsubClient implements Closeable {
             "Cannot get data catalog name for malformed topic path {}. 
Expected format: "
                 + "projects/<project>/topics/<topic>",
             path);
-        return "";
+        return ImmutableList.of();
       }
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 0fd4e9207d8..8b582c1054f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -85,6 +85,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -513,8 +514,8 @@ public class PubsubIO {
       }
     }
 
-    public String dataCatalogName() {
-      return String.format("pubsub:topic:%s.%s", project, topic);
+    public List<String> dataCatalogSegments() {
+      return ImmutableList.of(project, topic);
     }
 
     @Override
@@ -1624,7 +1625,7 @@ public class PubsubIO {
         }
         // Report lineage for all topics seen
         for (PubsubTopic topic : output.keySet()) {
-          Lineage.getSinks().add(topic.dataCatalogName());
+          Lineage.getSinks().add("pubsub", "topic", 
topic.dataCatalogSegments());
         }
         output = null;
         pubsubClient.close();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index defea87e835..38d77aa3aac 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -297,9 +296,9 @@ public class PubsubUnboundedSink extends 
PTransform<PCollection<PubsubMessage>,
       byteCounter.inc(bytes);
       // Report Lineage multiple once for same topic
       if (!topicPath.equals(reportedLineage)) {
-        String name = topicPath.getDataCatalogName();
-        if (!Strings.isNullOrEmpty(name)) {
-          Lineage.getSinks().add(topicPath.getDataCatalogName());
+        List<String> segments = topicPath.getDataCatalogSegments();
+        if (segments.size() != 0) {
+          Lineage.getSinks().add("pubsub", "topic", segments);
         }
         reportedLineage = topicPath;
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b131b521c06..95fa5c22341 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1045,14 +1045,14 @@ public class PubsubUnboundedSource extends 
PTransform<PBegin, PCollection<Pubsub
         TopicPath topic = outer.getTopic();
         if (topic != null) {
           // is initial split on Read.fromTopic, report Lineage based on topic
-          Lineage.getSources().add(topic.getDataCatalogName());
+          Lineage.getSources().add("pubsub", "source", 
topic.getDataCatalogSegments());
         }
       } else {
         if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
           SubscriptionPath sub = subscriptionPath.get();
           if (sub != null) {
             // is a split on Read.fromSubscription
-            Lineage.getSources().add(sub.getDataCatalogName());
+            Lineage.getSources().add("pubsub", "subscription", 
sub.getDataCatalogSegments());
           }
         }
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 4ce9ad10b2c..4faff5ad6bd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -154,7 +155,9 @@ public class BigtableReadIT {
     if (options.getRunner().getName().contains("DirectRunner")) {
       assertThat(
           Lineage.query(r.metrics(), Lineage.Type.SOURCE),
-          hasItem(String.format("bigtable:%s.%s.%s", project, 
options.getInstanceId(), tableId)));
+          hasItem(
+              Lineage.getFqName(
+                  "bigtable", ImmutableList.of(project, 
options.getInstanceId(), tableId))));
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 46bb3df836e..44f3e5a1992 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -423,7 +423,9 @@ public class BigtableWriteIT implements Serializable {
     if (options.getRunner().getName().contains("DirectRunner")) {
       assertThat(
           Lineage.query(r.metrics(), Lineage.Type.SINK),
-          hasItem(String.format("bigtable:%s.%s.%s", project, 
options.getInstanceId(), tableId)));
+          hasItem(
+              Lineage.getFqName(
+                  "bigtable", ImmutableList.of(project, 
options.getInstanceId(), tableId))));
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index fb007d1171d..9d7bc65f595 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -171,7 +172,7 @@ public class PubsubClientTest {
     SubscriptionPath path = PubsubClient.subscriptionPathFromName("test", 
"something");
     assertEquals("projects/test/subscriptions/something", path.getPath());
     assertEquals("/subscriptions/test/something", path.getFullPath());
-    assertEquals("pubsub:subscription:test.something", 
path.getDataCatalogName());
+    assertEquals(ImmutableList.of("test", "something"), 
path.getDataCatalogSegments());
   }
 
   @Test
@@ -179,7 +180,7 @@ public class PubsubClientTest {
     TopicPath path = PubsubClient.topicPathFromName("test", "something");
     assertEquals("projects/test/topics/something", path.getPath());
     assertEquals("/topics/test/something", path.getFullPath());
-    assertEquals("pubsub:topic:test.something", path.getDataCatalogName());
+    assertEquals(ImmutableList.of("test", "something"), 
path.getDataCatalogSegments());
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 74a98f0b8b4..d4effbae40a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -238,8 +238,8 @@ public class PubsubIOTest {
     assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
     assertThat(pubsubRead.getTopicProvider().get().asPath(), 
equalTo(provider.get()));
     assertThat(
-        pubsubRead.getTopicProvider().get().dataCatalogName(),
-        equalTo("pubsub:topic:project.topic"));
+        pubsubRead.getTopicProvider().get().dataCatalogSegments(),
+        equalTo(ImmutableList.of("project", "topic")));
   }
 
   @Test


Reply via email to