This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2c492434773 Add Lineage metrics to FileSystems (#32090)
2c492434773 is described below
commit 2c4924347732473df4dedbd3a93db9c8326fb477
Author: Yi Hu <[email protected]>
AuthorDate: Wed Aug 14 12:50:12 2024 -0400
Add Lineage metrics to FileSystems (#32090)
* Add Lineage metrics to FileSystems
* Handle reserved characters (colon, dot, white space) in Lineage FQN
* address comments - align up
---
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 1 +
.../org/apache/beam/sdk/io/FileBasedSource.java | 1 +
.../java/org/apache/beam/sdk/io/FileSystem.java | 8 ++
.../java/org/apache/beam/sdk/io/FileSystems.java | 11 +++
.../sdk/io/ReadAllViaFileBasedSourceTransform.java | 5 +-
.../java/org/apache/beam/sdk/metrics/Lineage.java | 97 ++++++++++++++++++++--
.../org/apache/beam/sdk/metrics/LineageTest.java | 59 +++++++++++++
.../sdk/extensions/gcp/storage/GcsFileSystem.java | 11 +++
.../apache/beam/sdk/io/aws/s3/S3FileSystem.java | 6 ++
.../apache/beam/sdk/io/aws2/s3/S3FileSystem.java | 6 ++
.../azure/blobstore/AzureBlobStoreFileSystem.java | 13 +++
.../java/org/apache/beam/sdk/io/text/TextIOIT.java | 5 ++
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 5 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 3 +-
.../io/gcp/bigquery/BigQueryStorageSourceBase.java | 7 +-
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 3 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 3 +-
.../bigquery/StorageApiWritesShardedRecords.java | 3 +-
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 3 +-
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 3 +-
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 6 +-
.../beam/sdk/io/gcp/pubsub/PubsubClient.java | 15 ++--
.../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 7 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 7 +-
.../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 4 +-
.../beam/sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +-
.../beam/sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +-
.../beam/sdk/io/gcp/pubsub/PubsubClientTest.java | 5 +-
.../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 4 +-
29 files changed, 265 insertions(+), 45 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 34b4246a708..b7523ee12b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -687,6 +687,7 @@ public abstract class FileBasedSink<UserT, DestinationT,
OutputT>
distinctFilenames.get(finalFilename));
distinctFilenames.put(finalFilename, result);
outputFilenames.add(KV.of(result, finalFilename));
+ FileSystems.reportSinkLineage(finalFilename);
}
return outputFilenames;
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d68338eceaf..7ddfde441ae 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -299,6 +299,7 @@ public abstract class FileBasedSource<T> extends
OffsetBasedSource<T> {
splitResults.size());
return splitResults;
} else {
+ FileSystems.reportSourceLineage(getSingleFileMetadata().resourceId());
if (isSplittable()) {
@SuppressWarnings("unchecked")
List<FileBasedSource<T>> splits =
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index 501cd72dadd..11314a318b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.metrics.Lineage;
/**
* File system interface in Beam.
@@ -155,4 +156,11 @@ public abstract class FileSystem<ResourceIdT extends
ResourceId> {
* @see <a href="https://www.ietf.org/rfc/rfc2396.txt">RFC 2396</a>
*/
protected abstract String getScheme();
+
+ /**
+ * Report {@link Lineage} metrics for resource id.
+ *
+ * <p>Unless override by FileSystem implementations, default to no-op.
+ */
+ protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage) {}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index c58ce9ad830..a4ca9b80dce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
@@ -395,6 +396,16 @@ public class FileSystems {
.delete(resourceIdsToDelete);
}
+ /** Report source {@link Lineage} metrics for resource id. */
+ public static void reportSourceLineage(ResourceId resourceId) {
+ getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId,
Lineage.getSources());
+ }
+
+ /** Report sink {@link Lineage} metrics for resource id. */
+ public static void reportSinkLineage(ResourceId resourceId) {
+ getFileSystemInternal(resourceId.getScheme()).reportLineage(resourceId,
Lineage.getSinks());
+ }
+
private static class FilterResult {
public List<ResourceId> resultSources = new ArrayList();
public List<ResourceId> resultDestinations = new ArrayList();
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
index 74680ab7086..bbac337f2d0 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSourceTransform.java
@@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -123,8 +124,9 @@ public abstract class
ReadAllViaFileBasedSourceTransform<InT, T>
public void process(ProcessContext c) throws IOException {
FileIO.ReadableFile file = c.element().getKey();
OffsetRange range = c.element().getValue();
+ ResourceId resourceId = file.getMetadata().resourceId();
FileBasedSource<InT> source =
-
CompressedSource.from(createSource.apply(file.getMetadata().resourceId().toString()))
+ CompressedSource.from(createSource.apply(resourceId.toString()))
.withCompression(file.getCompression());
try (BoundedSource.BoundedReader<InT> reader =
source
@@ -138,6 +140,7 @@ public abstract class
ReadAllViaFileBasedSourceTransform<InT, T>
throw e;
}
}
+ FileSystems.reportSourceLineage(resourceId);
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 6166a562bf2..302ae4f2fef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -19,26 +19,109 @@ package org.apache.beam.sdk.metrics;
import java.util.HashSet;
import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Standard collection of metrics used to record source and sinks information
for lineage tracking.
*/
public class Lineage {
+
public static final String LINEAGE_NAMESPACE = "lineage";
- private static final StringSet SOURCES =
- Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
- private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE,
Type.SINK.toString());
+ private static final Lineage SOURCES = new Lineage(Type.SOURCE);
+ private static final Lineage SINKS = new Lineage(Type.SINK);
+ private static final Pattern RESERVED_CHARS = Pattern.compile("[:\\s.]");
+
+ private final StringSet metric;
- /** {@link StringSet} representing sources and optionally side inputs. */
- public static StringSet getSources() {
+ private Lineage(Type type) {
+ this.metric = Metrics.stringSet(LINEAGE_NAMESPACE, type.toString());
+ }
+
+ /** {@link Lineage} representing sources and optionally side inputs. */
+ public static Lineage getSources() {
return SOURCES;
}
- /** {@link StringSet} representing sinks. */
- public static StringSet getSinks() {
+ /** {@link Lineage} representing sinks. */
+ public static Lineage getSinks() {
return SINKS;
}
+ /**
+ * Wrap segment to valid segment name.
+ *
+ * <p>Specifically, If there are reserved chars (colon, whitespace, dot),
escape with backtick. If
+ * the segment is already wrapped, return the original.
+ */
+ private static String wrapSegment(String value) {
+ if (value.startsWith("`") && value.endsWith("`")) {
+ return value;
+ }
+ if (RESERVED_CHARS.matcher(value).find()) {
+ return String.format("`%s`", value);
+ }
+ return value;
+ }
+
+ /**
+ * Assemble fully qualified name (<a
+ *
href="https://cloud.google.com/data-catalog/docs/fully-qualified-names">FQN</a>).
Format:
+ *
+ * <ul>
+ * <li>{@code system:segment1.segment2}
+ * <li>{@code system:routine:segment1.segment2}
+ * <li>{@code system:`segment1.with.dots:clons`.segment2}
+ * </ul>
+ *
+ * <p>This helper method is for internal and testing usage only.
+ */
+ @Internal
+ public static String getFqName(
+ String system, @Nullable String routine, Iterable<String> segments) {
+ StringBuilder builder = new StringBuilder(system);
+ if (!Strings.isNullOrEmpty(routine)) {
+ builder.append(":").append(routine);
+ }
+ int idx = 0;
+ for (String segment : segments) {
+ if (idx == 0) {
+ builder.append(":");
+ } else {
+ builder.append(".");
+ }
+ builder.append(wrapSegment(segment));
+ ++idx;
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Assemble the FQN of given system, and segments.
+ *
+ * <p>This helper method is for internal and testing usage only.
+ */
+ @Internal
+ public static String getFqName(String system, Iterable<String> segments) {
+ return getFqName(system, null, segments);
+ }
+
+ /**
+ * Add a FQN (fully-qualified name) to Lineage. Segments will be processed
via {@link #getFqName}.
+ */
+ public void add(String system, @Nullable String routine, Iterable<String>
segments) {
+ metric.add(getFqName(system, routine, segments));
+ }
+
+ /**
+ * Add a FQN (fully-qualified name) to Lineage. Segments will be processed
via {@link #getFqName}.
+ */
+ public void add(String system, Iterable<String> segments) {
+ add(system, null, segments);
+ }
+
/** Query {@link StringSet} metrics from {@link MetricResults}. */
public static Set<String> query(MetricResults results, Type type) {
MetricsFilter filter =
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
new file mode 100644
index 00000000000..432eb396fe2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/LineageTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Lineage}. */
+@RunWith(JUnit4.class)
+public class LineageTest {
+ @Test
+ public void testGetFqName() {
+ Map<String, String> testCases =
+ ImmutableMap.<String, String>builder()
+ .put("apache-beam", "apache-beam")
+ .put("`apache-beam`", "`apache-beam`")
+ .put("apache.beam", "`apache.beam`")
+ .put("apache:beam", "`apache:beam`")
+ .put("apache beam", "`apache beam`")
+ .put("`apache beam`", "`apache beam`")
+ .put("apache\tbeam", "`apache\tbeam`")
+ .put("apache\nbeam", "`apache\nbeam`")
+ .build();
+ testCases.forEach(
+ (key, value) ->
+ assertEquals("apache:" + value, Lineage.getFqName("apache",
ImmutableList.of(key))));
+ testCases.forEach(
+ (key, value) ->
+ assertEquals(
+ "apache:beam:" + value,
+ Lineage.getFqName("apache", "beam", ImmutableList.of(key))));
+ testCases.forEach(
+ (key, value) ->
+ assertEquals(
+ "apache:beam:" + value + "." + value,
+ Lineage.getFqName("apache", "beam", ImmutableList.of(key,
key))));
+ }
+}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index b49c434f81c..6332051c0dd 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
@@ -214,6 +215,16 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
return "gs";
}
+ @Override
+ protected void reportLineage(GcsResourceId resourceId, Lineage lineage) {
+ GcsPath path = resourceId.getGcsPath();
+ if (!path.getBucket().isEmpty()) {
+ lineage.add("gcs", ImmutableList.of(path.getBucket(), path.getObject()));
+ } else {
+ LOG.warn("Report Lineage on relative path {} is unsupported",
path.getObject());
+ }
+ }
+
private List<MatchResult> matchGlobs(List<GcsPath> globs) {
// TODO: Executes in parallel, address
https://issues.apache.org/jira/browse/BEAM-1503.
return FluentIterable.from(globs)
diff --git
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index fa442374e3c..7ed56efa44b 100644
---
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.MoreFutures;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -624,6 +625,11 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
return S3ResourceId.fromUri(singleResourceSpec);
}
+ @Override
+ protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+ lineage.add("s3", ImmutableList.of(resourceId.getBucket(),
resourceId.getKey()));
+ }
+
/**
* Invokes tasks in a thread pool, then unwraps the resulting {@link Future
Futures}.
*
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
index 5f08600758f..384c8c627ee 100644
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
+++
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.aws2.options.S3Options;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.MoreFutures;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -655,6 +656,11 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
return S3ResourceId.fromUri(singleResourceSpec);
}
+ @Override
+ protected void reportLineage(S3ResourceId resourceId, Lineage lineage) {
+ lineage.add("s3", ImmutableList.of(resourceId.getBucket(),
resourceId.getKey()));
+ }
+
/**
* Invokes tasks in a thread pool, then unwraps the resulting {@link Future
Futures}.
*
diff --git
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
index 3b6e79b4ef7..5137eaf9bb2 100644
---
a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
+++
b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.util.InstanceBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -449,4 +450,16 @@ class AzureBlobStoreFileSystem extends
FileSystem<AzfsResourceId> {
}
return AzfsResourceId.fromUri(singleResourceSpec);
}
+
+ @Override
+ protected void reportLineage(AzfsResourceId resourceId, Lineage lineage) {
+ if (!Strings.isNullOrEmpty(resourceId.getBlob())) {
+ lineage.add(
+ "abs",
+ ImmutableList.of(
+ resourceId.getAccount(), resourceId.getContainer(),
resourceId.getBlob()));
+ } else {
+ lineage.add("abs", ImmutableList.of(resourceId.getAccount(),
resourceId.getContainer()));
+ }
+ }
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index b852020c9bb..859c03ed775 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.text;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.time.Instant;
@@ -36,6 +37,7 @@ import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper.DeleteFileFn;
import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
@@ -152,6 +154,9 @@ public class TextIOIT {
PipelineResult result = pipeline.run();
PipelineResult.State pipelineState = result.waitUntilFinish();
+ assertEquals(
+ Lineage.query(result.metrics(), Lineage.Type.SOURCE),
+ Lineage.query(result.metrics(), Lineage.Type.SINK));
collectAndPublishMetrics(result);
// Fail the test if pipeline failed.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 61bed66a336..129c8314fc8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -413,7 +414,7 @@ public class BigQueryHelpers {
return sb.toString();
}
- public static String dataCatalogName(TableReference ref, BigQueryOptions
options) {
+ public static List<String> dataCatalogSegments(TableReference ref,
BigQueryOptions options) {
String tableIdBase;
int ix = ref.getTableId().indexOf('$');
if (ix == -1) {
@@ -429,7 +430,7 @@ public class BigQueryHelpers {
} else {
projectId = options.getProject();
}
- return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(),
tableIdBase);
+ return ImmutableList.of(projectId, ref.getDatasetId(), tableIdBase);
}
static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 998c82ab8d8..a8985775cbe 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,8 @@ abstract class BigQuerySourceBase<T> extends
BoundedSource<T> {
BigQueryHelpers.toTableSpec(tableToExtract)));
}
// emit this table ID as a lineage source
- Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract,
bqOptions));
+ Lineage.getSources()
+ .add("bigquery", BigQueryHelpers.dataCatalogSegments(tableToExtract,
bqOptions));
TableSchema schema = table.getSchema();
JobService jobService = bqServices.getJobService(bqOptions);
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index 3852d18ec12..51a5a8f391a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -35,7 +35,6 @@ import
org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -109,12 +108,12 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
@Nullable Table targetTable = getTargetTable(bqOptions);
ReadSession.Builder readSessionBuilder = ReadSession.newBuilder();
- StringSet lineageSources = Lineage.getSources();
+ Lineage lineage = Lineage.getSources();
if (targetTable != null) {
TableReference tableReference = targetTable.getTableReference();
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
// register the table as lineage source
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference,
bqOptions));
+ lineage.add("bigquery",
BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
} else {
// If the table does not exist targetTable will be null.
// Construct the table id if we can generate it. For error
recording/logging.
@@ -123,7 +122,7 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
readSessionBuilder.setTable(tableReferenceId);
// register the table as lineage source
TableReference tableReference =
BigQueryHelpers.parseTableUrn(tableReferenceId);
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference,
bqOptions));
+ lineage.add("bigquery",
BigQueryHelpers.dataCatalogSegments(tableReference, bqOptions));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index a55bcc3fe02..1bbd4e75608 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -121,7 +121,8 @@ public class CreateTables<DestinationT, ElementT>
BigQueryOptions bqOptions =
context.getPipelineOptions().as(BigQueryOptions.class);
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableDestination1.getTableReference(), bqOptions));
return CreateTableHelpers.possiblyCreateTable(
bqOptions,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 369bb2d7863..13a22cbfe90 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -1132,7 +1132,8 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
pipelineOptions.as(BigQueryOptions.class)));
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
state.getTableDestination().getTableReference(),
pipelineOptions.as(BigQueryOptions.class)));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index f3f512110b5..1ee001d9890 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -477,7 +477,8 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableDestination.getTableReference(), bigQueryOptions));
Coder<DestinationT> destinationCoder =
dynamicDestinations.getDestinationCoder();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 1a6a6a4db70..061e66024e2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -210,7 +210,8 @@ class WriteRename
if (!entry.getValue().isEmpty()) {
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
entry.getKey().getTableReference(),
c.getPipelineOptions().as(BigQueryOptions.class)));
pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(),
c, window));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index ace0bc5a74c..e374d459af4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -263,7 +263,8 @@ class WriteTables<DestinationT extends @NonNull Object>
} else {
Lineage.getSinks()
.add(
- BigQueryHelpers.dataCatalogName(
+ "bigquery",
+ BigQueryHelpers.dataCatalogSegments(
tableReference,
c.getPipelineOptions().as(BigQueryOptions.class)));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 6fdf67722ba..1af9ae4f932 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -216,7 +216,7 @@ class BigtableServiceImpl implements BigtableService {
@Override
public void reportLineage() {
- Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ Lineage.getSources().add("bigtable", ImmutableList.of(projectId,
instanceId, tableId));
}
}
@@ -327,7 +327,7 @@ class BigtableServiceImpl implements BigtableService {
@Override
public void reportLineage() {
- Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ Lineage.getSources().add("bigtable", ImmutableList.of(projectId,
instanceId, tableId));
}
@Override
@@ -597,7 +597,7 @@ class BigtableServiceImpl implements BigtableService {
@Override
public void reportLineage() {
- Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ Lineage.getSinks().add("bigtable", ImmutableList.of(projectId,
instanceId, tableId));
}
private ServiceCallMetric createServiceCallMetric() {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
index f66ee6e1d84..2964a29dbb6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -260,8 +261,8 @@ public abstract class PubsubClient implements Closeable {
return String.format("/subscriptions/%s/%s", projectId,
subscriptionName);
}
- public String getDataCatalogName() {
- return String.format("pubsub:subscription:%s.%s", projectId,
subscriptionName);
+ public List<String> getDataCatalogSegments() {
+ return ImmutableList.of(projectId, subscriptionName);
}
@Override
@@ -319,14 +320,14 @@ public abstract class PubsubClient implements Closeable {
}
/**
- * Returns the data catalog name. Format "pubsub:topic:`project`.`topic`"
This method is
- * fail-safe. If topic path is malformed, it returns an empty string.
+ * Returns the data catalog segments. This method is fail-safe. If topic
path is malformed, it
+ * returns an empty string.
*/
- public String getDataCatalogName() {
+ public List<String> getDataCatalogSegments() {
List<String> splits = Splitter.on('/').splitToList(path);
if (splits.size() == 4) {
// well-formed path
- return String.format("pubsub:topic:%s.%s", splits.get(1),
splits.get(3));
+ return ImmutableList.of(splits.get(1), splits.get(3));
} else {
// Mal-formed path. It is either a test fixture or user error and will
fail on publish.
// We do not throw exception instead return empty string here.
@@ -334,7 +335,7 @@ public abstract class PubsubClient implements Closeable {
"Cannot get data catalog name for malformed topic path {}.
Expected format: "
+ "projects/<project>/topics/<topic>",
path);
- return "";
+ return ImmutableList.of();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 0fd4e9207d8..8b582c1054f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -85,6 +85,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -513,8 +514,8 @@ public class PubsubIO {
}
}
- public String dataCatalogName() {
- return String.format("pubsub:topic:%s.%s", project, topic);
+ public List<String> dataCatalogSegments() {
+ return ImmutableList.of(project, topic);
}
@Override
@@ -1624,7 +1625,7 @@ public class PubsubIO {
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
- Lineage.getSinks().add(topic.dataCatalogName());
+ Lineage.getSinks().add("pubsub", "topic",
topic.dataCatalogSegments());
}
output = null;
pubsubClient.close();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index defea87e835..38d77aa3aac 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -297,9 +296,9 @@ public class PubsubUnboundedSink extends
PTransform<PCollection<PubsubMessage>,
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
- String name = topicPath.getDataCatalogName();
- if (!Strings.isNullOrEmpty(name)) {
- Lineage.getSinks().add(topicPath.getDataCatalogName());
+ List<String> segments = topicPath.getDataCatalogSegments();
+ if (segments.size() != 0) {
+ Lineage.getSinks().add("pubsub", "topic", segments);
}
reportedLineage = topicPath;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index b131b521c06..95fa5c22341 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1045,14 +1045,14 @@ public class PubsubUnboundedSource extends
PTransform<PBegin, PCollection<Pubsub
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
- Lineage.getSources().add(topic.getDataCatalogName());
+ Lineage.getSources().add("pubsub", "source",
topic.getDataCatalogSegments());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
- Lineage.getSources().add(sub.getDataCatalogName());
+ Lineage.getSources().add("pubsub", "subscription",
sub.getDataCatalogSegments());
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 4ce9ad10b2c..4faff5ad6bd 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -154,7 +155,9 @@ public class BigtableReadIT {
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SOURCE),
- hasItem(String.format("bigtable:%s.%s.%s", project,
options.getInstanceId(), tableId)));
+ hasItem(
+ Lineage.getFqName(
+ "bigtable", ImmutableList.of(project,
options.getInstanceId(), tableId))));
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 46bb3df836e..44f3e5a1992 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -423,7 +423,9 @@ public class BigtableWriteIT implements Serializable {
if (options.getRunner().getName().contains("DirectRunner")) {
assertThat(
Lineage.query(r.metrics(), Lineage.Type.SINK),
- hasItem(String.format("bigtable:%s.%s.%s", project,
options.getInstanceId(), tableId)));
+ hasItem(
+ Lineage.getFqName(
+ "bigtable", ImmutableList.of(project,
options.getInstanceId(), tableId))));
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
index fb007d1171d..9d7bc65f595 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java
@@ -27,6 +27,7 @@ import
org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -171,7 +172,7 @@ public class PubsubClientTest {
SubscriptionPath path = PubsubClient.subscriptionPathFromName("test",
"something");
assertEquals("projects/test/subscriptions/something", path.getPath());
assertEquals("/subscriptions/test/something", path.getFullPath());
- assertEquals("pubsub:subscription:test.something",
path.getDataCatalogName());
+ assertEquals(ImmutableList.of("test", "something"),
path.getDataCatalogSegments());
}
@Test
@@ -179,7 +180,7 @@ public class PubsubClientTest {
TopicPath path = PubsubClient.topicPathFromName("test", "something");
assertEquals("projects/test/topics/something", path.getPath());
assertEquals("/topics/test/something", path.getFullPath());
- assertEquals("pubsub:topic:test.something", path.getDataCatalogName());
+ assertEquals(ImmutableList.of("test", "something"),
path.getDataCatalogSegments());
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
index 74a98f0b8b4..d4effbae40a 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
@@ -238,8 +238,8 @@ public class PubsubIOTest {
assertThat(pubsubRead.getTopicProvider().isAccessible(), is(true));
assertThat(pubsubRead.getTopicProvider().get().asPath(),
equalTo(provider.get()));
assertThat(
- pubsubRead.getTopicProvider().get().dataCatalogName(),
- equalTo("pubsub:topic:project.topic"));
+ pubsubRead.getTopicProvider().get().dataCatalogSegments(),
+ equalTo(ImmutableList.of("project", "topic")));
}
@Test