This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab908b984d Add Lineage metrics for BigtableIO (#32068)
5ab908b984d is described below

commit 5ab908b984d4144b5cbe584d7ed4ed7a4e226993
Author: Yi Hu <[email protected]>
AuthorDate: Tue Aug 6 15:03:57 2024 -0400

    Add Lineage metrics for BigtableIO (#32068)
    
    * Add Lineage metrics for BigtableIO
    
    * add tests
    
    * simplify metrics query logics; exclude test actually already failing
    
    * Address comments, fix typo
---
 .../java/org/apache/beam/sdk/metrics/Lineage.java  | 43 +++++++++++++++++++---
 sdks/java/io/google-cloud-platform/build.gradle    |  4 ++
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 14 +++++++
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  6 +++
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   | 22 +++++++++++
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 23 ++----------
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 13 +------
 .../beam/sdk/io/gcp/bigtable/BigtableReadIT.java   | 21 ++++++++++-
 .../beam/sdk/io/gcp/bigtable/BigtableWriteIT.java  | 18 ++++++++-
 9 files changed, 123 insertions(+), 41 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 7890a9f74b9..8b69b0ef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -17,17 +17,17 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * Standard collection of metrics used to record source and sinks information 
for lineage tracking.
  */
 public class Lineage {
-
   public static final String LINEAGE_NAMESPACE = "lineage";
-  public static final String SOURCE_METRIC_NAME = "sources";
-  public static final String SINK_METRIC_NAME = "sinks";
-
-  private static final StringSet SOURCES = 
Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME);
-  private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, 
SINK_METRIC_NAME);
+  private static final StringSet SOURCES =
+      Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
+  private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE, 
Type.SINK.toString());
 
   /** {@link StringSet} representing sources and optionally side inputs. */
   public static StringSet getSources() {
@@ -38,4 +38,35 @@ public class Lineage {
   public static StringSet getSinks() {
     return SINKS;
   }
+
+  /** Query {@link StringSet} metrics from {@link MetricResults}. */
+  public static Set<String> query(MetricResults results, Type type) {
+    MetricsFilter filter =
+        MetricsFilter.builder()
+            .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE, 
type.toString()))
+            .build();
+    Set<String> result = new HashSet<>();
+    for (MetricResult<StringSetResult> metrics : 
results.queryMetrics(filter).getStringSets()) {
+      result.addAll(metrics.getCommitted().getStringSet());
+      result.addAll(metrics.getAttempted().getStringSet());
+    }
+    return result;
+  }
+
+  /** Lineage metrics resource types. */
+  public enum Type {
+    SOURCE("source"),
+    SINK("sink");
+
+    private final String name;
+
+    Type(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return name;
+    }
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index e499bae6fc6..23c56f13a94 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn: 
processTestResources) {
 
   useJUnit {
     excludeCategories "org.apache.beam.sdk.testing.UsesKms"
+    filter {
+      // https://github.com/apache/beam/issues/32071
+      excludeTestsMatching 
'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead'
+    }
   }
 }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d78ae2cb6c5..6d20109e947 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -1337,6 +1337,7 @@ public class BigtableIO {
     private transient Set<KV<BigtableWriteException, BoundedWindow>> 
badRecords = null;
     // Due to callback thread not supporting Beam metrics, Record pending 
metrics and report later.
     private transient long pendingThrottlingMsecs;
+    private transient boolean reportedLineage;
 
     // Assign serviceEntry in startBundle and clear it in tearDown.
     @Nullable private BigtableServiceEntry serviceEntry;
@@ -1480,6 +1481,10 @@ public class BigtableIO {
               throttlingMsecs.inc(excessTime);
             }
           }
+          if (!reportedLineage) {
+            bigtableWriter.reportLineage();
+            reportedLineage = true;
+          }
           bigtableWriter = null;
         }
 
@@ -1612,6 +1617,7 @@ public class BigtableIO {
     private final BigtableConfig config;
     private final BigtableReadOptions readOptions;
     private @Nullable Long estimatedSizeBytes;
+    private transient boolean reportedLineage;
 
     private final BigtableServiceFactory.ConfigId configId;
 
@@ -1989,6 +1995,13 @@ public class BigtableIO {
     public ValueProvider<String> getTableId() {
       return readOptions.getTableId();
     }
+
+    void reportLineageOnce(BigtableService.Reader reader) {
+      if (!reportedLineage) {
+        reader.reportLineage();
+        reportedLineage = true;
+      }
+    }
   }
 
   private static class BigtableReader extends BoundedReader<Row> {
@@ -2019,6 +2032,7 @@ public class BigtableIO {
               || rangeTracker.markDone();
       if (hasRecord) {
         ++recordsReturned;
+        source.reportLineageOnce(reader);
       }
       return hasRecord;
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 261cc3ac081..50d8126999c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -57,6 +57,9 @@ interface BigtableService extends Serializable {
      * @throws IOException if there is an error closing the writer
      */
     void close() throws IOException;
+
+    /** Report Lineage metrics to runner. */
+    default void reportLineage() {}
   }
 
   /** The interface of a class that reads from Cloud Bigtable. */
@@ -77,6 +80,9 @@ interface BigtableService extends Serializable {
     Row getCurrentRow() throws NoSuchElementException;
 
     void close();
+
+    /** Report Lineage metrics to runner. */
+    default void reportLineage() {}
   }
 
   /** Returns a {@link Reader} that will read from the specified source. */
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index f06a4a12768..6fdf67722ba 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -212,6 +213,11 @@ class BigtableServiceImpl implements BigtableService {
         exhausted = true;
       }
     }
+
+    @Override
+    public void reportLineage() {
+      Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+    }
   }
 
   @VisibleForTesting
@@ -225,6 +231,9 @@ class BigtableServiceImpl implements BigtableService {
     private final int refillSegmentWaterMark;
     private final long maxSegmentByteSize;
     private ServiceCallMetric serviceCallMetric;
+    private final String projectId;
+    private final String instanceId;
+    private final String tableId;
 
     private static class UpstreamResults {
       private final List<Row> rows;
@@ -308,11 +317,19 @@ class BigtableServiceImpl implements BigtableService {
       // Asynchronously refill buffer when there is 10% of the elements are 
left
       this.refillSegmentWaterMark =
           Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
+      this.projectId = projectId;
+      this.instanceId = instanceId;
+      this.tableId = tableId;
     }
 
     @Override
     public void close() {}
 
+    @Override
+    public void reportLineage() {
+      Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+    }
+
     @Override
     public boolean start() throws IOException {
       future = fetchNextSegment();
@@ -578,6 +595,11 @@ class BigtableServiceImpl implements BigtableService {
       }
     }
 
+    @Override
+    public void reportLineage() {
+      Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId, 
instanceId, tableId));
+    }
+
     private ServiceCallMetric createServiceCallMetric() {
       // Populate metrics
       HashMap<String, String> baseLabels = new HashMap<>();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 5c43666e79e..a8aca7570b3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecordBase;
@@ -61,9 +62,6 @@ import 
org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
 import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -351,18 +349,8 @@ public class BigQueryIOReadTest implements Serializable {
   }
 
   private void checkLineageSourceMetric(PipelineResult pipelineResult, String 
tableName) {
-    MetricQueryResults lineageMetrics =
-        pipelineResult
-            .metrics()
-            .queryMetrics(
-                MetricsFilter.builder()
-                    .addNameFilter(
-                        MetricNameFilter.named(
-                            Lineage.LINEAGE_NAMESPACE, 
Lineage.SOURCE_METRIC_NAME))
-                    .build());
-    assertThat(
-        
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
-        contains("bigquery:" + tableName.replace(':', '.')));
+    Set<String> result = Lineage.query(pipelineResult.metrics(), 
Lineage.Type.SOURCE);
+    assertThat(result, contains("bigquery:" + tableName.replace(':', '.')));
   }
 
   @Before
@@ -600,10 +588,7 @@ public class BigQueryIOReadTest implements Serializable {
                 new MyData("b", 2L, bd1, bd2),
                 new MyData("c", 3L, bd1, bd2)));
     PipelineResult result = p.run();
-    // Skip when direct runner splits outside of a counters context.
-    if (useTemplateCompatibility) {
-      checkLineageSourceMetric(result, 
"non-executing-project:somedataset.sometable");
-    }
+    checkLineageSourceMetric(result, 
"non-executing-project:somedataset.sometable");
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index bc90d4c8bae..c5af8045bfe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -118,9 +118,6 @@ import 
org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
 import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.JavaFieldSchema;
@@ -285,16 +282,8 @@ public class BigQueryIOWriteTest implements Serializable {
           .withJobService(fakeJobService);
 
   private void checkLineageSinkMetric(PipelineResult pipelineResult, String 
tableName) {
-    MetricQueryResults lineageMetrics =
-        pipelineResult
-            .metrics()
-            .queryMetrics(
-                MetricsFilter.builder()
-                    .addNameFilter(
-                        MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE, 
Lineage.SINK_METRIC_NAME))
-                    .build());
     assertThat(
-        
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
+        Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK),
         hasItem("bigquery:" + tableName.replace(':', '.')));
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index bc88858ebc3..4ce9ad10b2c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.bigtable;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+
 import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
 import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
 import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -28,7 +31,9 @@ import java.io.IOException;
 import java.util.Date;
 import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -110,7 +115,8 @@ public class BigtableReadIT {
         
p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
             .apply(Count.globally());
     PAssert.thatSingleton(count).isEqualTo(numRows);
-    p.run();
+    PipelineResult r = p.run();
+    checkLineageSourceMetric(r, tableId);
   }
 
   @Test
@@ -138,6 +144,17 @@ public class BigtableReadIT {
                     .withMaxBufferElementCount(10))
             .apply(Count.globally());
     PAssert.thatSingleton(count).isEqualTo(numRows);
-    p.run();
+    PipelineResult r = p.run();
+    checkLineageSourceMetric(r, tableId);
+  }
+
+  private void checkLineageSourceMetric(PipelineResult r, String tableId) {
+    // TODO(https://github.com/apache/beam/issues/32071) test malformed,
+    //   when pipeline.run() is non-blocking, the metrics are not available by 
the time of query
+    if (options.getRunner().getName().contains("DirectRunner")) {
+      assertThat(
+          Lineage.query(r.metrics(), Lineage.Type.SOURCE),
+          hasItem(String.format("bigtable:%s.%s.%s", project, 
options.getInstanceId(), tableId)));
+    }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index bf9f7d991fa..46bb3df836e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.gax.rpc.ServerStream;
@@ -39,8 +40,10 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.metrics.Lineage;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
@@ -142,7 +145,7 @@ public class BigtableWriteIT implements Serializable {
                 .withProjectId(project)
                 .withInstanceId(options.getInstanceId())
                 .withTableId(tableId));
-    p.run();
+    PipelineResult r = p.run();
 
     // Test number of column families and column family name equality
     Table table = getTable(tableId);
@@ -154,6 +157,7 @@ public class BigtableWriteIT implements Serializable {
     // Test table data equality
     List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
     assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray()));
+    checkLineageSinkMetric(r, tableId);
   }
 
   @Test
@@ -340,7 +344,7 @@ public class BigtableWriteIT implements Serializable {
     errorHandler.close();
     
PAssert.thatSingleton(Objects.requireNonNull(errorHandler.getOutput())).isEqualTo(2L);
 
-    p.run();
+    PipelineResult r = p.run();
 
     // Test number of column families and column family name equality
     Table table = getTable(tableId);
@@ -352,6 +356,7 @@ public class BigtableWriteIT implements Serializable {
     // Test table data equality
     List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
     assertEquals(998, tableData.size());
+    checkLineageSinkMetric(r, tableId);
   }
 
   @After
@@ -412,4 +417,13 @@ public class BigtableWriteIT implements Serializable {
       tableAdminClient.deleteTable(tableId);
     }
   }
+
+  private void checkLineageSinkMetric(PipelineResult r, String tableId) {
+    // Only check lineage metrics on direct runner until Dataflow runner v2 
supported report back
+    if (options.getRunner().getName().contains("DirectRunner")) {
+      assertThat(
+          Lineage.query(r.metrics(), Lineage.Type.SINK),
+          hasItem(String.format("bigtable:%s.%s.%s", project, 
options.getInstanceId(), tableId)));
+    }
+  }
 }

Reply via email to