This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e3dbc5d  Output successful rows from BQ Streaming Inserts
     new 6e050bc  Merge pull request #16768 from Output successful rows from BQ 
Streaming Inserts
e3dbc5d is described below

commit e3dbc5d90cc94cb4e04573d0634927b90a07e7f6
Author: Pablo Estrada <[email protected]>
AuthorDate: Mon Feb 7 14:16:34 2022 -0800

    Output successful rows from BQ Streaming Inserts
---
 .../sdk/io/gcp/bigquery/BatchedStreamingWrite.java |  3 ++
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 39 ++++++++++++++--------
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index f5ee5ea..015a5e0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -274,6 +274,9 @@ class BatchedStreamingWrite<ErrorT, ElementT>
       for (ValueInSingleWindow<ErrorT> row : failedInserts) {
         context.output(failedOutputTag, row.getValue(), row.getTimestamp(), 
row.getWindow());
       }
+      for (ValueInSingleWindow<TableRow> row : successfulInserts) {
+        context.output(SUCCESSFUL_ROWS_TAG, row.getValue(), 
row.getTimestamp(), row.getWindow());
+      }
       reportStreamingApiLogging(options);
     }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index df34790..7b2df25 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1305,20 +1305,31 @@ public class BigQueryIOWriteTest implements 
Serializable {
     if (useStorageApi || !useStreaming) {
       return;
     }
-    p.apply(
-            Create.of(
-                new SchemaPojo("a", 1),
-                new SchemaPojo("b", 2),
-                new SchemaPojo("c", 3),
-                new SchemaPojo("d", 4)))
-        .apply(
-            BigQueryIO.<SchemaPojo>write()
-                .to("project-id:dataset-id.table-id")
-                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-                .withMethod(Method.STREAMING_INSERTS)
-                .useBeamSchema()
-                .withTestServices(fakeBqServices)
-                .withoutValidation());
+    WriteResult result =
+        p.apply(
+                Create.of(
+                    new SchemaPojo("a", 1),
+                    new SchemaPojo("b", 2),
+                    new SchemaPojo("c", 3),
+                    new SchemaPojo("d", 4)))
+            .apply(
+                BigQueryIO.<SchemaPojo>write()
+                    .to("project-id:dataset-id.table-id")
+                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                    .withMethod(Method.STREAMING_INSERTS)
+                    .useBeamSchema()
+                    .withTestServices(fakeBqServices)
+                    .withoutValidation());
+
+    PAssert.that(result.getSuccessfulInserts())
+        .satisfies(
+            new SerializableFunction<Iterable<TableRow>, Void>() {
+              @Override
+              public Void apply(Iterable<TableRow> input) {
+                assertThat(Lists.newArrayList(input).size(), is(4));
+                return null;
+              }
+            });
     p.run();
 
     assertThat(

Reply via email to