[ 
https://issues.apache.org/jira/browse/BEAM-12497?focusedWorklogId=616716&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-616716
 ]

ASF GitHub Bot logged work on BEAM-12497:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Jun/21 21:29
            Start Date: 29/Jun/21 21:29
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on a change in pull request #15013:
URL: https://github.com/apache/beam/pull/15013#discussion_r660922229



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -191,23 +192,25 @@ private BatchedStreamingWrite(
   }
 
   @Override
-  public PCollection<ErrorT> expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
+  public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
     return batchViaStateful
         ? input.apply(new ViaStateful())
         : input.apply(new ViaBundleFinalization());
   }
 
   private class ViaBundleFinalization
-      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, 
PCollection<ErrorT>> {
+      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, 
PCollectionTuple> {
     @Override
-    public PCollection<ErrorT> expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
+    public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
       PCollectionTuple result =
           input.apply(
               ParDo.of(new BatchAndInsertElements())
-                  .withOutputTags(mainOutputTag, 
TupleTagList.of(failedOutputTag)));
+                  .withOutputTags(
+                      mainOutputTag, 
TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
       PCollection<ErrorT> failedInserts = result.get(failedOutputTag);
       failedInserts.setCoder(failedOutputCoder);

Review comment:
       Done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -191,23 +192,25 @@ private BatchedStreamingWrite(
   }
 
   @Override
-  public PCollection<ErrorT> expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
+  public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
     return batchViaStateful
         ? input.apply(new ViaStateful())
         : input.apply(new ViaBundleFinalization());
   }
 
   private class ViaBundleFinalization
-      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, 
PCollection<ErrorT>> {
+      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, 
PCollectionTuple> {
     @Override
-    public PCollection<ErrorT> expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
+    public PCollectionTuple expand(PCollection<KV<String, 
TableRowInfo<ElementT>>> input) {
       PCollectionTuple result =
           input.apply(
               ParDo.of(new BatchAndInsertElements())
-                  .withOutputTags(mainOutputTag, 
TupleTagList.of(failedOutputTag)));
+                  .withOutputTags(
+                      mainOutputTag, 
TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));

Review comment:
       Yeah, it's curious.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -306,10 +311,13 @@ public void finishBundle(FinishBundleContext context) 
throws Exception {
                       ShardedKey.Coder.of(StringUtf8Coder.of()), 
IterableCoder.of(valueCoder)))
               .apply(
                   ParDo.of(new InsertBatchedElements())
-                      .withOutputTags(mainOutputTag, 
TupleTagList.of(failedOutputTag)));
+                      .withOutputTags(
+                          mainOutputTag,
+                          
TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
       PCollection<ErrorT> failedInserts = result.get(failedOutputTag);
       failedInserts.setCoder(failedOutputCoder);

Review comment:
       Done.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
##########
@@ -305,20 +316,22 @@ public WriteResult 
expand(PCollection<KV<TableDestination, ElementT>> input) {
 
       // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey 
transform which groups and at
       // the same time batches the TableRows to be inserted to BigQuery.
-      return unshardedTagged.apply(
-          "StreamingWrite",
-          new BatchedStreamingWrite<>(
-                  bigQueryServices,
-                  retryPolicy,
-                  failedInsertsTag,
-                  coder,
-                  errorContainer,
-                  skipInvalidRows,
-                  ignoreUnknownValues,
-                  ignoreInsertIds,
-                  toTableRow,
-                  toFailsafeTableRow)
-              .viaStateful());
+      PCollectionTuple result =

Review comment:
       Removed. Thanks!

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
##########
@@ -334,44 +347,47 @@ public WriteResult 
expand(PCollection<KV<TableDestination, ElementT>> input) {
                       ShardedKeyCoder.of(StringUtf8Coder.of()),
                       TableRowInfoCoder.of(elementCoder)));
 
-      return shardedTagged
-          .apply(Reshuffle.of())
-          // Put in the global window to ensure that DynamicDestinations side 
inputs are accessed
-          // correctly.
-          .apply(
-              "GlobalWindow",
-              Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new 
GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes())
-          .apply(
-              "StripShardId",
-              MapElements.via(
-                  new SimpleFunction<
-                      KV<ShardedKey<String>, TableRowInfo<ElementT>>,
-                      KV<String, TableRowInfo<ElementT>>>() {
-                    @Override
-                    public KV<String, TableRowInfo<ElementT>> apply(
-                        KV<ShardedKey<String>, TableRowInfo<ElementT>> input) {
-                      return KV.of(input.getKey().getKey(), input.getValue());
-                    }
-                  }))
-          .setCoder(KvCoder.of(StringUtf8Coder.of(), 
TableRowInfoCoder.of(elementCoder)))
-          // Also batch the TableRows in a best effort manner via bundle 
finalization before
-          // inserting to BigQuery.
-          .apply(
-              "StreamingWrite",
-              new BatchedStreamingWrite<>(
-                      bigQueryServices,
-                      retryPolicy,
-                      failedInsertsTag,
-                      coder,
-                      errorContainer,
-                      skipInvalidRows,
-                      ignoreUnknownValues,
-                      ignoreInsertIds,
-                      toTableRow,
-                      toFailsafeTableRow)
-                  .viaDoFnFinalization());
+      PCollectionTuple result =

Review comment:
       undone. Thanks!

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -844,6 +903,7 @@ public void deleteDataset(String projectId, String 
datasetId)
                 + "as many elements as rowList");
       }
 
+      final Set<Integer> failedIndices = new HashSet<>();

Review comment:
       if all elements failed in a single request, we would have 500 failed 
indices by default (this setting can be tweaked by the user, but it would 
usually be 500 or less) - so I think it's an acceptable size. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 616716)
    Time Spent: 1h 10m  (was: 1h)

> BigQueryIO should return successfully inserted rows
> ---------------------------------------------------
>
>                 Key: BEAM-12497
>                 URL: https://issues.apache.org/jira/browse/BEAM-12497
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: P2
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> As part of a CDC use case, we want to be able to receive rows AFTER they've 
> been successfully inserted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to