stankiewicz commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3386138632


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -379,12 +381,19 @@ public WriteResult expandUntriggered(
     PCollection<KV<DestinationT, StorageApiWritePayload>> 
successfulConvertedRows =
         convertMessagesResult.get(successfulConvertedRowsTag);
 
-    if (numShards > 0) {
+    boolean streaming = 
input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming();
+    if (numShards > 0 && streaming) {
       successfulConvertedRows =
           successfulConvertedRows.apply(
               "ResdistibuteNumShards",
               Redistribute.<KV<DestinationT, 
StorageApiWritePayload>>arbitrarily()
                   .withNumBuckets(numShards));
+    } else if (numShards > 0 && !streaming) {
+      successfulConvertedRows =
+          successfulConvertedRows
+              .apply("AddKeyWithSideInputs", ParDo.of(new 
AddShardKeyFn<>(numShards)))

Review Comment:
   Tested 700 destination this with dataflow batch which have long bundles. 
Also this change is isolated for batch.
   
   Baseline - today to sw api in batch with that amount of  tables and num 
shards set you can't make it work as you will reach a concurrent connection 
quota within seconds. Workaround would have be to write without shards and 
partition data prior to incoming bq transform.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to