This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ab908b984d Add Lineage metrics for BigtableIO (#32068)
5ab908b984d is described below
commit 5ab908b984d4144b5cbe584d7ed4ed7a4e226993
Author: Yi Hu <[email protected]>
AuthorDate: Tue Aug 6 15:03:57 2024 -0400
Add Lineage metrics for BigtableIO (#32068)
* Add Lineage metrics for BigtableIO
* add tests
* simplify metrics query logics; exclude test actually already failing
* Address comments, fix typo
---
.../java/org/apache/beam/sdk/metrics/Lineage.java | 43 +++++++++++++++++++---
sdks/java/io/google-cloud-platform/build.gradle | 4 ++
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 14 +++++++
.../beam/sdk/io/gcp/bigtable/BigtableService.java | 6 +++
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 22 +++++++++++
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 23 ++----------
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 13 +------
.../beam/sdk/io/gcp/bigtable/BigtableReadIT.java | 21 ++++++++++-
.../beam/sdk/io/gcp/bigtable/BigtableWriteIT.java | 18 ++++++++-
9 files changed, 123 insertions(+), 41 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
index 7890a9f74b9..8b69b0ef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Lineage.java
@@ -17,17 +17,17 @@
*/
package org.apache.beam.sdk.metrics;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* Standard collection of metrics used to record source and sinks information
for lineage tracking.
*/
public class Lineage {
-
public static final String LINEAGE_NAMESPACE = "lineage";
- public static final String SOURCE_METRIC_NAME = "sources";
- public static final String SINK_METRIC_NAME = "sinks";
-
- private static final StringSet SOURCES =
Metrics.stringSet(LINEAGE_NAMESPACE, SOURCE_METRIC_NAME);
- private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE,
SINK_METRIC_NAME);
+ private static final StringSet SOURCES =
+ Metrics.stringSet(LINEAGE_NAMESPACE, Type.SOURCE.toString());
+ private static final StringSet SINKS = Metrics.stringSet(LINEAGE_NAMESPACE,
Type.SINK.toString());
/** {@link StringSet} representing sources and optionally side inputs. */
public static StringSet getSources() {
@@ -38,4 +38,35 @@ public class Lineage {
public static StringSet getSinks() {
return SINKS;
}
+
+ /** Query {@link StringSet} metrics from {@link MetricResults}. */
+ public static Set<String> query(MetricResults results, Type type) {
+ MetricsFilter filter =
+ MetricsFilter.builder()
+ .addNameFilter(MetricNameFilter.named(LINEAGE_NAMESPACE,
type.toString()))
+ .build();
+ Set<String> result = new HashSet<>();
+ for (MetricResult<StringSetResult> metrics :
results.queryMetrics(filter).getStringSets()) {
+ result.addAll(metrics.getCommitted().getStringSet());
+ result.addAll(metrics.getAttempted().getStringSet());
+ }
+ return result;
+ }
+
+ /** Lineage metrics resource types. */
+ public enum Type {
+ SOURCE("source"),
+ SINK("sink");
+
+ private final String name;
+
+ Type(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/build.gradle
b/sdks/java/io/google-cloud-platform/build.gradle
index e499bae6fc6..23c56f13a94 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -218,6 +218,10 @@ task integrationTest(type: Test, dependsOn:
processTestResources) {
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
+ filter {
+ // https://github.com/apache/beam/issues/32071
+ excludeTestsMatching
'org.apache.beam.sdk.io.gcp.bigtable.BigtableReadIT.testE2EBigtableSegmentRead'
+ }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index d78ae2cb6c5..6d20109e947 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -1337,6 +1337,7 @@ public class BigtableIO {
private transient Set<KV<BigtableWriteException, BoundedWindow>>
badRecords = null;
// Due to callback thread not supporting Beam metrics, Record pending
metrics and report later.
private transient long pendingThrottlingMsecs;
+ private transient boolean reportedLineage;
// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;
@@ -1480,6 +1481,10 @@ public class BigtableIO {
throttlingMsecs.inc(excessTime);
}
}
+ if (!reportedLineage) {
+ bigtableWriter.reportLineage();
+ reportedLineage = true;
+ }
bigtableWriter = null;
}
@@ -1612,6 +1617,7 @@ public class BigtableIO {
private final BigtableConfig config;
private final BigtableReadOptions readOptions;
private @Nullable Long estimatedSizeBytes;
+ private transient boolean reportedLineage;
private final BigtableServiceFactory.ConfigId configId;
@@ -1989,6 +1995,13 @@ public class BigtableIO {
public ValueProvider<String> getTableId() {
return readOptions.getTableId();
}
+
+ void reportLineageOnce(BigtableService.Reader reader) {
+ if (!reportedLineage) {
+ reader.reportLineage();
+ reportedLineage = true;
+ }
+ }
}
private static class BigtableReader extends BoundedReader<Row> {
@@ -2019,6 +2032,7 @@ public class BigtableIO {
|| rangeTracker.markDone();
if (hasRecord) {
++recordsReturned;
+ source.reportLineageOnce(reader);
}
return hasRecord;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 261cc3ac081..50d8126999c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -57,6 +57,9 @@ interface BigtableService extends Serializable {
* @throws IOException if there is an error closing the writer
*/
void close() throws IOException;
+
+ /** Report Lineage metrics to runner. */
+ default void reportLineage() {}
}
/** The interface of a class that reads from Cloud Bigtable. */
@@ -77,6 +80,9 @@ interface BigtableService extends Serializable {
Row getCurrentRow() throws NoSuchElementException;
void close();
+
+ /** Report Lineage metrics to runner. */
+ default void reportLineage() {}
}
/** Returns a {@link Reader} that will read from the specified source. */
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index f06a4a12768..6fdf67722ba 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -71,6 +71,7 @@ import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.values.KV;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -212,6 +213,11 @@ class BigtableServiceImpl implements BigtableService {
exhausted = true;
}
}
+
+ @Override
+ public void reportLineage() {
+ Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ }
}
@VisibleForTesting
@@ -225,6 +231,9 @@ class BigtableServiceImpl implements BigtableService {
private final int refillSegmentWaterMark;
private final long maxSegmentByteSize;
private ServiceCallMetric serviceCallMetric;
+ private final String projectId;
+ private final String instanceId;
+ private final String tableId;
private static class UpstreamResults {
private final List<Row> rows;
@@ -308,11 +317,19 @@ class BigtableServiceImpl implements BigtableService {
// Asynchronously refill buffer when there is 10% of the elements are
left
this.refillSegmentWaterMark =
Math.max(1, (int) (request.getRowsLimit() * WATERMARK_PERCENTAGE));
+ this.projectId = projectId;
+ this.instanceId = instanceId;
+ this.tableId = tableId;
}
@Override
public void close() {}
+ @Override
+ public void reportLineage() {
+ Lineage.getSources().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ }
+
@Override
public boolean start() throws IOException {
future = fetchNextSegment();
@@ -578,6 +595,11 @@ class BigtableServiceImpl implements BigtableService {
}
}
+ @Override
+ public void reportLineage() {
+ Lineage.getSinks().add(String.format("bigtable:%s.%s.%s", projectId,
instanceId, tableId));
+ }
+
private ServiceCallMetric createServiceCallMetric() {
// Populate metrics
HashMap<String, String> baseLabels = new HashMap<>();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 5c43666e79e..a8aca7570b3 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -43,6 +43,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
@@ -61,9 +62,6 @@ import
org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -351,18 +349,8 @@ public class BigQueryIOReadTest implements Serializable {
}
private void checkLineageSourceMetric(PipelineResult pipelineResult, String
tableName) {
- MetricQueryResults lineageMetrics =
- pipelineResult
- .metrics()
- .queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(
- MetricNameFilter.named(
- Lineage.LINEAGE_NAMESPACE,
Lineage.SOURCE_METRIC_NAME))
- .build());
- assertThat(
-
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
- contains("bigquery:" + tableName.replace(':', '.')));
+ Set<String> result = Lineage.query(pipelineResult.metrics(),
Lineage.Type.SOURCE);
+ assertThat(result, contains("bigquery:" + tableName.replace(':', '.')));
}
@Before
@@ -600,10 +588,7 @@ public class BigQueryIOReadTest implements Serializable {
new MyData("b", 2L, bd1, bd2),
new MyData("c", 3L, bd1, bd2)));
PipelineResult result = p.run();
- // Skip when direct runner splits outside of a counters context.
- if (useTemplateCompatibility) {
- checkLineageSourceMetric(result,
"non-executing-project:somedataset.sometable");
- }
+ checkLineageSourceMetric(result,
"non-executing-project:somedataset.sometable");
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index bc90d4c8bae..c5af8045bfe 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -118,9 +118,6 @@ import
org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
@@ -285,16 +282,8 @@ public class BigQueryIOWriteTest implements Serializable {
.withJobService(fakeJobService);
private void checkLineageSinkMetric(PipelineResult pipelineResult, String
tableName) {
- MetricQueryResults lineageMetrics =
- pipelineResult
- .metrics()
- .queryMetrics(
- MetricsFilter.builder()
- .addNameFilter(
- MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE,
Lineage.SINK_METRIC_NAME))
- .build());
assertThat(
-
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
+ Lineage.query(pipelineResult.metrics(), Lineage.Type.SINK),
hasItem("bigquery:" + tableName.replace(':', '.')));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index bc88858ebc3..4ce9ad10b2c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
+
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
@@ -28,7 +31,9 @@ import java.io.IOException;
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -110,7 +115,8 @@ public class BigtableReadIT {
p.apply(BigtableIO.read().withBigtableOptions(bigtableOptionsBuilder).withTableId(tableId))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
- p.run();
+ PipelineResult r = p.run();
+ checkLineageSourceMetric(r, tableId);
}
@Test
@@ -138,6 +144,17 @@ public class BigtableReadIT {
.withMaxBufferElementCount(10))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
- p.run();
+ PipelineResult r = p.run();
+ checkLineageSourceMetric(r, tableId);
+ }
+
+ private void checkLineageSourceMetric(PipelineResult r, String tableId) {
+ // TODO(https://github.com/apache/beam/issues/32071) test malformed,
+ // when pipeline.run() is non-blocking, the metrics are not available by
the time of query
+ if (options.getRunner().getName().contains("DirectRunner")) {
+ assertThat(
+ Lineage.query(r.metrics(), Lineage.Type.SOURCE),
+ hasItem(String.format("bigtable:%s.%s.%s", project,
options.getInstanceId(), tableId)));
+ }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index bf9f7d991fa..46bb3df836e 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigtable;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import com.google.api.gax.rpc.ServerStream;
@@ -39,8 +40,10 @@ import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.PAssert;
@@ -142,7 +145,7 @@ public class BigtableWriteIT implements Serializable {
.withProjectId(project)
.withInstanceId(options.getInstanceId())
.withTableId(tableId));
- p.run();
+ PipelineResult r = p.run();
// Test number of column families and column family name equality
Table table = getTable(tableId);
@@ -154,6 +157,7 @@ public class BigtableWriteIT implements Serializable {
// Test table data equality
List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
assertThat(tableData, Matchers.containsInAnyOrder(testData.toArray()));
+ checkLineageSinkMetric(r, tableId);
}
@Test
@@ -340,7 +344,7 @@ public class BigtableWriteIT implements Serializable {
errorHandler.close();
PAssert.thatSingleton(Objects.requireNonNull(errorHandler.getOutput())).isEqualTo(2L);
- p.run();
+ PipelineResult r = p.run();
// Test number of column families and column family name equality
Table table = getTable(tableId);
@@ -352,6 +356,7 @@ public class BigtableWriteIT implements Serializable {
// Test table data equality
List<KV<ByteString, ByteString>> tableData = getTableData(tableId);
assertEquals(998, tableData.size());
+ checkLineageSinkMetric(r, tableId);
}
@After
@@ -412,4 +417,13 @@ public class BigtableWriteIT implements Serializable {
tableAdminClient.deleteTable(tableId);
}
}
+
+ private void checkLineageSinkMetric(PipelineResult r, String tableId) {
+ // Only check lineage metrics on direct runner until Dataflow runner v2
supported report back
+ if (options.getRunner().getName().contains("DirectRunner")) {
+ assertThat(
+ Lineage.query(r.metrics(), Lineage.Type.SINK),
+ hasItem(String.format("bigtable:%s.%s.%s", project,
options.getInstanceId(), tableId)));
+ }
+ }
}