This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch release-2.58.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.58.0 by this push:
new b95a750e862 Merge pull request #31823 Add lineage information for
BigQuery sinks. (#31836)
b95a750e862 is described below
commit b95a750e862d5f9a179805528eff5dd0b605f597
Author: Robert Bradshaw <[email protected]>
AuthorDate: Wed Jul 10 13:50:55 2024 -0700
Merge pull request #31823 Add lineage information for BigQuery sinks.
(#31836)
---
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 21 +++++++++++--
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 5 ++--
.../io/gcp/bigquery/BigQueryStorageSourceBase.java | 4 +--
.../beam/sdk/io/gcp/bigquery/CreateTables.java | 8 ++++-
.../bigquery/StorageApiWriteUnshardedRecords.java | 14 +++++++++
.../bigquery/StorageApiWritesShardedRecords.java | 6 ++++
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 6 ++++
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 6 ++++
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 34 +++++++++++++++++++---
9 files changed, 92 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 7f5d675ccf7..61bed66a336 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -54,6 +54,7 @@ import
org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -412,9 +413,23 @@ public class BigQueryHelpers {
return sb.toString();
}
- public static String dataCatalogName(TableReference ref) {
- return String.format(
- "bigquery:%s.%s.%s", ref.getProjectId(), ref.getDatasetId(),
ref.getTableId());
+ public static String dataCatalogName(TableReference ref, BigQueryOptions
options) {
+ String tableIdBase;
+ int ix = ref.getTableId().indexOf('$');
+ if (ix == -1) {
+ tableIdBase = ref.getTableId();
+ } else {
+ tableIdBase = ref.getTableId().substring(0, ix);
+ }
+ String projectId;
+ if (!Strings.isNullOrEmpty(ref.getProjectId())) {
+ projectId = ref.getProjectId();
+ } else if (!Strings.isNullOrEmpty(options.getBigQueryProject())) {
+ projectId = options.getBigQueryProject();
+ } else {
+ projectId = options.getProject();
+ }
+ return String.format("bigquery:%s.%s.%s", projectId, ref.getDatasetId(),
tableIdBase);
}
static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a863c49f46a..38c0c8e43b2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -121,7 +121,7 @@ abstract class BigQuerySourceBase<T> extends
BoundedSource<T> {
BigQueryHelpers.toTableSpec(tableToExtract)));
}
// emit this table ID as a lineage source
-
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract));
+ Lineage.getSources().add(BigQueryHelpers.dataCatalogName(tableToExtract,
bqOptions));
TableSchema schema = table.getSchema();
JobService jobService = bqServices.getJobService(bqOptions);
@@ -158,7 +158,8 @@ abstract class BigQuerySourceBase<T> extends
BoundedSource<T> {
if (res.extractedFiles.size() > 0) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
// emit this table ID as a lineage source
-
Lineage.getSources().add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions)));
+ Lineage.getSources()
+ .add(BigQueryHelpers.dataCatalogName(getTableToExtract(bqOptions),
bqOptions));
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(),
"BigQueryExtractTemp", stepUuid);
// Match all files in the destination directory to stat them in bulk.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index a283ed6c260..3852d18ec12 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -114,7 +114,7 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
TableReference tableReference = targetTable.getTableReference();
readSessionBuilder.setTable(BigQueryHelpers.toTableResourceName(tableReference));
// register the table as lineage source
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
+ lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference,
bqOptions));
} else {
// If the table does not exist targetTable will be null.
// Construct the table id if we can generate it. For error
recording/logging.
@@ -123,7 +123,7 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
readSessionBuilder.setTable(tableReferenceId);
// register the table as lineage source
TableReference tableReference =
BigQueryHelpers.parseTableUrn(tableReferenceId);
- lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference));
+ lineageSources.add(BigQueryHelpers.dataCatalogName(tableReference,
bqOptions));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index 7e5299b7e67..a55bcc3fe02 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -117,8 +118,13 @@ public class CreateTables<DestinationT, ElementT>
Supplier<@Nullable TableConstraints> tableConstraintsSupplier =
() -> dynamicDestinations.getTableConstraints(dest);
+ BigQueryOptions bqOptions =
context.getPipelineOptions().as(BigQueryOptions.class);
+ Lineage.getSinks()
+ .add(
+ BigQueryHelpers.dataCatalogName(
+ tableDestination1.getTableReference(), bqOptions));
return CreateTableHelpers.possiblyCreateTable(
- context.getPipelineOptions().as(BigQueryOptions.class),
+ bqOptions,
tableDestination1,
schemaSupplier,
tableConstraintsSupplier,
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index ce5e7b4854e..21c1d961e84 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -61,6 +61,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -267,6 +268,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
}
class DestinationState {
+ private final TableDestination tableDestination;
private final String tableUrn;
private final String shortTableUrn;
private String streamName = "";
@@ -298,6 +300,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
private final boolean includeCdcColumns;
public DestinationState(
+ TableDestination tableDestination,
String tableUrn,
String shortTableUrn,
MessageConverter<ElementT> messageConverter,
@@ -309,6 +312,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
Callable<Boolean> tryCreateTable,
boolean includeCdcColumns)
throws Exception {
+ this.tableDestination = tableDestination;
this.tableUrn = tableUrn;
this.shortTableUrn = shortTableUrn;
this.pendingMessages = Lists.newArrayList();
@@ -327,6 +331,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
}
}
+ public TableDestination getTableDestination() {
+ return tableDestination;
+ }
+
void teardown() {
maybeTickleCache();
if (appendClientInfo != null) {
@@ -1050,6 +1058,7 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
try {
messageConverter = messageConverters.get(destination,
dynamicDestinations, datasetService);
return new DestinationState(
+ tableDestination1,
tableDestination1.getTableUrn(bigQueryOptions),
tableDestination1.getShortTableUrn(),
messageConverter,
@@ -1089,6 +1098,11 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
initializedDatasetService,
initializedWriteStreamService,
pipelineOptions.as(BigQueryOptions.class)));
+ Lineage.getSinks()
+ .add(
+ BigQueryHelpers.dataCatalogName(
+ state.getTableDestination().getTableReference(),
+ pipelineOptions.as(BigQueryOptions.class)));
OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver =
o.get(failedRowsTag);
@Nullable
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 1232e1a7097..a7da19a75f8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -62,6 +62,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -469,6 +470,11 @@ public class StorageApiWritesShardedRecords<DestinationT
extends @NonNull Object
final DatasetService datasetService = getDatasetService(pipelineOptions);
final WriteStreamService writeStreamService =
getWriteStreamService(pipelineOptions);
+ Lineage.getSinks()
+ .add(
+ BigQueryHelpers.dataCatalogName(
+ tableDestination.getTableReference(), bigQueryOptions));
+
Coder<DestinationT> destinationCoder =
dynamicDestinations.getDestinationCoder();
Callable<Boolean> tryCreateTable =
() -> {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index a7177613c60..1a6a6a4db70 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -34,6 +34,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -207,6 +208,11 @@ class WriteRename
// Process each destination table.
// Do not copy if no temp tables are provided.
if (!entry.getValue().isEmpty()) {
+ Lineage.getSinks()
+ .add(
+ BigQueryHelpers.dataCatalogName(
+ entry.getKey().getTableReference(),
+ c.getPipelineOptions().as(BigQueryOptions.class)));
pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(),
c, window));
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index c6a7d32e248..ace0bc5a74c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -49,6 +49,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -259,6 +260,11 @@ class WriteTables<DestinationT extends @NonNull Object>
}
// This is a temp table. Create a new one for each partition and each
pane.
tableReference.setTableId(jobIdPrefix);
+ } else {
+ Lineage.getSinks()
+ .add(
+ BigQueryHelpers.dataCatalogName(
+ tableReference,
c.getPipelineOptions().as(BigQueryOptions.class)));
}
WriteDisposition writeDisposition = firstPaneWriteDisposition;
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index d303948fe44..bc90d4c8bae 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -93,6 +94,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -115,6 +117,10 @@ import
org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.metrics.Lineage;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
@@ -278,6 +284,20 @@ public class BigQueryIOWriteTest implements Serializable {
.withDatasetService(fakeDatasetService)
.withJobService(fakeJobService);
+ private void checkLineageSinkMetric(PipelineResult pipelineResult, String
tableName) {
+ MetricQueryResults lineageMetrics =
+ pipelineResult
+ .metrics()
+ .queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(
+ MetricNameFilter.named(Lineage.LINEAGE_NAMESPACE,
Lineage.SINK_METRIC_NAME))
+ .build());
+ assertThat(
+
lineageMetrics.getStringSets().iterator().next().getCommitted().getStringSet(),
+ hasItem("bigquery:" + tableName.replace(':', '.')));
+ }
+
@Before
public void setUp() throws ExecutionException, IOException,
InterruptedException {
FakeDatasetService.setUp();
@@ -488,7 +508,7 @@ public class BigQueryIOWriteTest implements Serializable {
.containsInAnyOrder(expectedTables);
}
- p.run();
+ PipelineResult pipelineResult = p.run();
Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
for (String anUserList : userList) {
@@ -505,6 +525,7 @@ public class BigQueryIOWriteTest implements Serializable {
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-"
+ entry.getKey()),
containsInAnyOrder(Iterables.toArray(entry.getValue(),
TableRow.class)));
+ checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.userid-" +
entry.getKey());
}
}
@@ -680,7 +701,7 @@ public class BigQueryIOWriteTest implements Serializable {
}
p.apply(testStream).apply(writeTransform);
- p.run();
+ PipelineResult pipelineResult = p.run();
final int projectIdSplitter = tableRef.indexOf(':');
final String projectId =
@@ -689,6 +710,9 @@ public class BigQueryIOWriteTest implements Serializable {
assertThat(
fakeDatasetService.getAllRows(projectId, "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+
+ checkLineageSinkMetric(
+ pipelineResult, tableRef.contains(projectId) ? tableRef : projectId +
":" + tableRef);
}
public void runStreamingFileLoads(String tableRef) throws Exception {
@@ -828,11 +852,12 @@ public class BigQueryIOWriteTest implements Serializable {
PAssert.that(result.getSuccessfulTableLoads())
.containsInAnyOrder(new
TableDestination("project-id:dataset-id.table-id", null));
- p.run();
+ PipelineResult pipelineResult = p.run();
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+ checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
}
@Test
@@ -861,11 +886,12 @@ public class BigQueryIOWriteTest implements Serializable {
PAssert.that(result.getSuccessfulTableLoads())
.containsInAnyOrder(new
TableDestination("project-id:dataset-id.table-id", null));
- p.run();
+ PipelineResult pipelineResult = p.run();
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+ checkLineageSinkMetric(pipelineResult, "project-id.dataset-id.table-id");
}
@Test