reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r876229787


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -455,21 +506,26 @@ public void process(
     @FinishBundle
     public void finishBundle(FinishBundleContext context) throws Exception {
       flushAll();
-      for (DestinationState state : destinations.values()) {
-        if (!useDefaultStream) {
+      if (!useDefaultStream) {

Review Comment:
   Why this change? Won't this prevent us from unpinning the client?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions 
pipelineOptions) {
       }
     }
 
+    @Setup
+    public void setup() {
+      if (useDefaultStream) {
+        destinations = Maps.newHashMap();
+      }
+    }
+
     @StartBundle
     public void startBundle() throws IOException {
-      destinations = Maps.newHashMap();
+      if (!useDefaultStream) {

Review Comment:
   Why add this if?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -455,21 +506,26 @@ public void process(
     @FinishBundle
     public void finishBundle(FinishBundleContext context) throws Exception {
       flushAll();
-      for (DestinationState state : destinations.values()) {
-        if (!useDefaultStream) {
+      if (!useDefaultStream) {
+        for (DestinationState state : destinations.values()) {
           context.output(
               KV.of(state.tableUrn, state.streamName),
               BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
               GlobalWindow.INSTANCE);
+          state.teardown();
         }
-        state.teardown();
+        destinations.clear();
+        destinations = null;
       }
-      destinations.clear();
-      destinations = null;
     }
 
     @Teardown
     public void teardown() {
+      if (destinations != null) {
+        for (DestinationState state : destinations.values()) {
+          state.teardown();
+        }

Review Comment:
   We shouldn't be doing this in teardown



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,12 +93,22 @@
   void setUseStorageWriteApiAtLeastOnce(Boolean value);
 
   @Description(
-      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams.")
+      "If set, then BigQueryIO.Write will default to using this number of 
Storage Write API streams. ")
   @Default.Integer(0)
   Integer getNumStorageWriteApiStreams();
 
   void setNumStorageWriteApiStreams(Integer value);
 
+  @Description(
+      "When using the \"_default\" table stream, this option sets the number 
of stream append clients that will be allocated "

Review Comment:
   reference at-least once writes. Users don't know about default streams, as 
that's an implementation detail



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions 
pipelineOptions) {
       }
     }
 
+    @Setup
+    public void setup() {

Review Comment:
   I'm not sure what this is adding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to