This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch release-2.58.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.58.0 by this push:
     new b95a750e862 Merge pull request #31823 Add lineage information for 
BigQuery sinks. (#31836)
b95a750e862 is described below

commit b95a750e862d5f9a179805528eff5dd0b605f597
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 10 13:50:55 2024 -0700

    Merge pull request #31823 Add lineage information for BigQuery sinks. 
(#31836)
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 21 +++++++++++--
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    |  5 ++--
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java |  4 +--
 .../beam/sdk/io/gcp/bigquery/CreateTables.java     |  8 ++++-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 14 +++++++++
 .../bigquery/StorageApiWritesShardedRecords.java   |  6 ++++
 .../beam/sdk/io/gcp/bigquery/WriteRename.java      |  6 ++++
 .../beam/sdk/io/gcp/bigquery/WriteTables.java      |  6 ++++
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 34 +++++++++++++++++++---
 9 files changed, 92 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 7f5d675ccf7..61bed66a336 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -54,6 +54,7 @@ import 
org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.FluentBackoff;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -412,9 +413,23 @@ public class BigQueryHelpers {
     return sb.toString();
   }
 
-  public static String dataCatalogName(TableReference ref) {
-    return String.format(
-        "bigquery:%s.%s.%s", ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId());
+  public static String dataCatalogName(TableReference ref, BigQueryOptions 
options) {
+    String tableIdBase;
+    int ix = ref.getTableId().indexOf('$');
+    if (ix == -1) {
+      tableIdBase = ref.getTableId();
+    } else {
+      tableIdBase = ref.getTableId().substring(0, ix);
+    }
+    String projectId;
+    if (!Strings.isNullOrEmpty(ref.getProjectId())) {
+      projectId = ref.getProjectId();
+    } else if (!Strings.isNullOrEmpty(options.getBigQueryProject())) {
+      projectId = options.getBigQueryProject();
+    } else {
+      projectId = options.getProject();
+    }
+    return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(), 
tableIdBase);
   }
 
   static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a863c49f46a..38c0c8e43b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,7 @@ abstract class BigQuerySourceBase<T> extends 
BoundedSource<T> {
                 BigQueryHelpers.toTableSpec(tableToExtract)));
       }
       // emit this table ID as a lineage source
-      
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract));
+      Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract, 
bqOptions));
 
       TableSchema schema = table.getSchema();
       JobService jobService = bqServices.getJobService(bqOptions);
@@ -158,7 +158,8 @@ abstract class BigQuerySourceBase<T> extends 
BoundedSource<T> {
       if (res.extractedFiles.size() > 0) {
         BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
         // emit this table ID as a lineage source
-        
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions)));
+        Lineage.getSources()
+            .add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions), 
bqOptions));
         final String extractDestinationDir =
             resolveTempLocation(bqOptions.getTempLocation(), 
"BigQueryExtractTemp", stepUuid);
         // Match all files in the destination directory to stat them in bulk.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index a283ed6c260..3852d18ec12 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -114,7 +114,7 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
       TableReference tableReference = targetTable.getTableReference();
       
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
       // register the table as lineage source
-      lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
+      lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, 
bqOptions));
     } else {
       // If the table does not exist targetTable will be null.
       // Construct the table id if we can generate it. For error 
recording/logging.
@@ -123,7 +123,7 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
         readSessionBuilder.setTable(tableReferenceId);
         // register the table as lineage source
         TableReference tableReference = 
BigQueryHelpers.parseTableUrn(tableReferenceId);
-        lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
+        lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference, 
bqOptions));
       }
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 7e5299b7e67..a55bcc3fe02 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableSchema;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -117,8 +118,13 @@ public class CreateTables<DestinationT, ElementT>
                 Supplier<@Nullable TableConstraints> tableConstraintsSupplier =
                     () -> dynamicDestinations.getTableConstraints(dest);
 
+                BigQueryOptions bqOptions = 
context.getPipelineOptions().as(BigQueryOptions.class);
+                Lineage.getSinks()
+                    .add(
+                        BigQueryHelpers.dataCatalogName(
+                            tableDestination1.getTableReference(), bqOptions));
                 return CreateTableHelpers.possiblyCreateTable(
-                    context.getPipelineOptions().as(BigQueryOptions.class),
+                    bqOptions,
                     tableDestination1,
                     schemaSupplier,
                     tableConstraintsSupplier,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index ce5e7b4854e..21c1d961e84 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -61,6 +61,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -267,6 +268,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
     }
 
     class DestinationState {
+      private final TableDestination tableDestination;
       private final String tableUrn;
       private final String shortTableUrn;
       private String streamName = "";
@@ -298,6 +300,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       private final boolean includeCdcColumns;
 
       public DestinationState(
+          TableDestination tableDestination,
           String tableUrn,
           String shortTableUrn,
           MessageConverter<ElementT> messageConverter,
@@ -309,6 +312,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           Callable<Boolean> tryCreateTable,
           boolean includeCdcColumns)
           throws Exception {
+        this.tableDestination = tableDestination;
         this.tableUrn = tableUrn;
         this.shortTableUrn = shortTableUrn;
         this.pendingMessages = Lists.newArrayList();
@@ -327,6 +331,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
         }
       }
 
+      public TableDestination getTableDestination() {
+        return tableDestination;
+      }
+
       void teardown() {
         maybeTickleCache();
         if (appendClientInfo != null) {
@@ -1050,6 +1058,7 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       try {
         messageConverter = messageConverters.get(destination, 
dynamicDestinations, datasetService);
         return new DestinationState(
+            tableDestination1,
             tableDestination1.getTableUrn(bigQueryOptions),
             tableDestination1.getShortTableUrn(),
             messageConverter,
@@ -1089,6 +1098,11 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                           initializedDatasetService,
                           initializedWriteStreamService,
                           pipelineOptions.as(BigQueryOptions.class)));
+      Lineage.getSinks()
+          .add(
+              BigQueryHelpers.dataCatalogName(
+                  state.getTableDestination().getTableReference(),
+                  pipelineOptions.as(BigQueryOptions.class)));
 
       OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver = 
o.get(failedRowsTag);
       @Nullable
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 1232e1a7097..a7da19a75f8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -62,6 +62,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -469,6 +470,11 @@ public class StorageApiWritesShardedRecords<DestinationT 
extends @NonNull Object
       final DatasetService datasetService = getDatasetService(pipelineOptions);
       final WriteStreamService writeStreamService = 
getWriteStreamService(pipelineOptions);
 
+      Lineage.getSinks()
+          .add(
+              BigQueryHelpers.dataCatalogName(
+                  tableDestination.getTableReference(), bigQueryOptions));
+
       Coder<DestinationT> destinationCoder = 
dynamicDestinations.getDestinationCoder();
       Callable<Boolean> tryCreateTable =
           () -> {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index a7177613c60..1a6a6a4db70 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -207,6 +208,11 @@ class WriteRename
         // Process each destination table.
         // Do not copy if no temp tables are provided.
         if (!entry.getValue().isEmpty()) {
+          Lineage.getSinks()
+              .add(
+                  BigQueryHelpers.dataCatalogName(
+                      entry.getKey().getTableReference(),
+                      c.getPipelineOptions().as(BigQueryOptions.class)));
           pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), 
c, window));
         }
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c6a7d32e248..ace0bc5a74c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -49,6 +49,7 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -259,6 +260,11 @@ class WriteTables<DestinationT extends @NonNull Object>
         }
         // This is a temp table. Create a new one for each partition and each 
pane.
         tableReference.setTableId(jobIdPrefix);
+      } else {
+        Lineage.getSinks()
+            .add(
+                BigQueryHelpers.dataCatalogName(
+                    tableReference, 
c.getPipelineOptions().as(BigQueryOptions.class)));
       }
 
       WriteDisposition writeDisposition = firstPaneWriteDisposition;
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index d303948fe44..bc90d4c8bae 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -93,6 +94,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -115,6 +117,10 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
 import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.metrics.Lineage;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
@@ -278,6 +284,20 @@ public class BigQueryIOWriteTest implements Serializable {
           .withDatasetService(fakeDatasetService)
           .withJobService(fakeJobService);
 
+  private void checkLineageSinkMetric(PipelineResult pipelineResult, String 
tableName) {
+    MetricQueryResults lineageMetrics =
+        pipelineResult
+            .metrics()
+            .queryMetrics(
+                MetricsFilter.builder()
+                    .addNameFilter(
+                        MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, 
Lineage.SINK_METRIC_NAME))
+                    .build());
+    assertThat(
+        
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
+        hasItem("bigquery:" + tableName.replace(':', '.')));
+  }
+
   @Before
   public void setUp() throws ExecutionException, IOException, 
InterruptedException {
     FakeDatasetService.setUp();
@@ -488,7 +508,7 @@ public class BigQueryIOWriteTest implements Serializable {
           .containsInAnyOrder(expectedTables);
     }
 
-    p.run();
+    PipelineResult pipelineResult = p.run();
 
     Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
     for (String anUserList : userList) {
@@ -505,6 +525,7 @@ public class BigQueryIOWriteTest implements Serializable {
       assertThat(
           fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-" 
+ entry.getKey()),
           containsInAnyOrder(Iterables.toArray(entry.getValue(), 
TableRow.class)));
+      checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.userid-" + 
entry.getKey());
     }
   }
 
@@ -680,7 +701,7 @@ public class BigQueryIOWriteTest implements Serializable {
     }
 
     p.apply(testStream).apply(writeTransform);
-    p.run();
+    PipelineResult pipelineResult = p.run();
 
     final int projectIdSplitter = tableRef.indexOf(':');
     final String projectId =
@@ -689,6 +710,9 @@ public class BigQueryIOWriteTest implements Serializable {
     assertThat(
         fakeDatasetService.getAllRows(projectId, "dataset-id", "table-id"),
         containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+
+    checkLineageSinkMetric(
+        pipelineResult, tableRef.contains(projectId) ? tableRef : projectId + 
":" + tableRef);
   }
 
   public void runStreamingFileLoads(String tableRef) throws Exception {
@@ -828,11 +852,12 @@ public class BigQueryIOWriteTest implements Serializable {
 
     PAssert.that(result.getSuccessfulTableLoads())
         .containsInAnyOrder(new 
TableDestination("project-id:dataset-id.table-id", null));
-    p.run();
+    PipelineResult pipelineResult = p.run();
 
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+    checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
   }
 
   @Test
@@ -861,11 +886,12 @@ public class BigQueryIOWriteTest implements Serializable {
 
     PAssert.that(result.getSuccessfulTableLoads())
         .containsInAnyOrder(new 
TableDestination("project-id:dataset-id.table-id", null));
-    p.run();
+    PipelineResult pipelineResult = p.run();
 
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+    checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
   }
 
   @Test

Reply via email to