This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e3dbc5d Output successful rows from BQ Streaming Inserts
new 6e050bc Merge pull request #16768 from Output successful rows from BQ
Streaming Inserts
e3dbc5d is described below
commit e3dbc5d90cc94cb4e04573d0634927b90a07e7f6
Author: Pablo Estrada <[email protected]>
AuthorDate: Mon Feb 7 14:16:34 2022 -0800
Output successful rows from BQ Streaming Inserts
---
.../sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 3 ++
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 39 ++++++++++++++--------
2 files changed, 28 insertions(+), 14 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index f5ee5ea..015a5e0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -274,6 +274,9 @@ class BatchedStreamingWrite<ErrorT, ElementT>
for (ValueInSingleWindow<ErrorT> row : failedInserts) {
context.output(failedOutputTag, row.getValue(), row.getTimestamp(),
row.getWindow());
}
+ for (ValueInSingleWindow<TableRow> row : successfulInserts) {
+ context.output(SUCCESSFUL_ROWS_TAG, row.getValue(),
row.getTimestamp(), row.getWindow());
+ }
reportStreamingApiLogging(options);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index df34790..7b2df25 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1305,20 +1305,31 @@ public class BigQueryIOWriteTest implements
Serializable {
if (useStorageApi || !useStreaming) {
return;
}
- p.apply(
- Create.of(
- new SchemaPojo("a", 1),
- new SchemaPojo("b", 2),
- new SchemaPojo("c", 3),
- new SchemaPojo("d", 4)))
- .apply(
- BigQueryIO.<SchemaPojo>write()
- .to("project-id:dataset-id.table-id")
-
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
- .withMethod(Method.STREAMING_INSERTS)
- .useBeamSchema()
- .withTestServices(fakeBqServices)
- .withoutValidation());
+ WriteResult result =
+ p.apply(
+ Create.of(
+ new SchemaPojo("a", 1),
+ new SchemaPojo("b", 2),
+ new SchemaPojo("c", 3),
+ new SchemaPojo("d", 4)))
+ .apply(
+ BigQueryIO.<SchemaPojo>write()
+ .to("project-id:dataset-id.table-id")
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(Method.STREAMING_INSERTS)
+ .useBeamSchema()
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ PAssert.that(result.getSuccessfulInserts())
+ .satisfies(
+ new SerializableFunction<Iterable<TableRow>, Void>() {
+ @Override
+ public Void apply(Iterable<TableRow> input) {
+ assertThat(Lists.newArrayList(input).size(), is(4));
+ return null;
+ }
+ });
p.run();
assertThat(