reuvenlax commented on code in PR #38264:
URL: https://github.com/apache/beam/pull/38264#discussion_r3155988295


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java:
##########
@@ -134,7 +143,18 @@ public AppendClientInfo withAppendClient(
   public void close() {
     BigQueryServices.StreamAppendClient client = getStreamAppendClient();
     if (client != null) {
-      getCloseAppendClient().accept(client);
+      try {
+        getCloseAppendClient().accept(client);
+      } catch (Exception e) {
+        // We ignore errors when closing clients.
+        String msg =
+            e
+                + "\n"
+                + Arrays.stream(e.getStackTrace())
+                    .map(StackTraceElement::toString)
+                    .collect(Collectors.joining("\n"));
+        LOG.warn("Caught exception whilw trying to close append client. 
Ignoring {}", msg);

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -671,475 +880,223 @@ public void process(
                 AppendClientInfo.of(
                         Preconditions.checkStateNotNull(tableSchema),
                         descriptor,
-                        // Make sure that the client is always closed in a 
different thread
-                        // to
-                        // avoid blocking.
-                        client ->
-                            runAsyncIgnoreFailure(
-                                closeWriterExecutor,
-                                () -> {
-                                  // Remove the pin that is "owned" by the 
cache.
-                                  client.unpin();
-                                  client.close();
-                                }))
+                        AutoCloseable::close)
                     .withAppendClient(
                         writeStreamService,
                         getOrCreateStream,
                         false,
                         defaultMissingValueInterpretation);
-            // This pin is "owned" by the cache.
-            
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
             return info;
           };
 
-      AtomicReference<AppendClientInfo> appendClientInfo =
-          new AtomicReference<>(
-              APPEND_CLIENTS.get(
-                  messageConverters.getAppendClientKey(element.getKey()), 
getAppendClientInfo));
-      String currentStream = getOrCreateStream.get();
-      if (!currentStream.equals(appendClientInfo.get().getStreamName())) {
-        // Cached append client is inconsistent with persisted state. Throw 
away cached item and
-        // force it to be
-        // recreated.
-        
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
-        appendClientInfo.set(
-            APPEND_CLIENTS.get(
-                messageConverters.getAppendClientKey(element.getKey()), 
getAppendClientInfo));
-      }
-
-      TableSchema updatedSchemaValue = updatedSchema.read();
-      if (autoUpdateSchema && updatedSchemaValue != null) {
-        if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) {
-          appendClientInfo.set(
-              AppendClientInfo.of(
-                  updatedSchemaValue, 
appendClientInfo.get().getCloseAppendClient(), false));
-          
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
-          APPEND_CLIENTS.put(
-              messageConverters.getAppendClientKey(element.getKey()), 
appendClientInfo.get());
+      // The StreamWriter has two pins on it. The static cache holds a pin, as 
it continues to cache
+      // values after this
+      // method exits, so must hold the pin. The local AppendClientHolder also 
holds a pin, as the
+      // cache could in
+      // theory evict the object during execution and we want a pin held 
throughout the execution of
+      // this function.
+      try (AppendClientHolder appendClientHolder =
+          new AppendClientHolder(element.getKey(), getAppendClientInfo)) {
+        String currentStream = getOrCreateStream.get();
+        if (!currentStream.equals(appendClientHolder.get().getStreamName())) {
+          // Cached append client is inconsistent with persisted state. Throw 
away cached item and
+          // force it to be recreated.
+          appendClientHolder.invalidateAndReset();
         }
-      }
 
-      // Initialize stream names and offsets for all contexts. This will be 
called initially, but
-      // will also be called if we roll over to a new stream on a retry.
-      BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean> 
initializeContexts =
-          (contexts, isFailure) -> {
-            try {
-              if (isFailure) {
-                // Clear the stream name, forcing a new one to be created.
-                streamName.write("");
-              }
-              appendClientInfo.set(
-                  appendClientInfo
-                      .get()
-                      .withAppendClient(
-                          writeStreamService,
-                          getOrCreateStream,
-                          false,
-                          defaultMissingValueInterpretation));
-              StreamAppendClient streamAppendClient =
-                  Preconditions.checkArgumentNotNull(
-                      appendClientInfo.get().getStreamAppendClient());
-              String streamNameRead = 
Preconditions.checkArgumentNotNull(streamName.read());
-              long currentOffset = 
Preconditions.checkArgumentNotNull(streamOffset.read());
-              for (AppendRowsContext<DestinationT> context : contexts) {
-                context.streamName = streamNameRead;
-                streamAppendClient.pin();
-                context.client = 
appendClientInfo.get().getStreamAppendClient();
-                context.offset = currentOffset;
-                ++context.tryIteration;
-                currentOffset = context.offset + 
context.protoRows.getSerializedRowsCount();
-              }
-              streamOffset.write(currentOffset);
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          };
-
-      Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients =
-          (contexts) -> {
-            
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
-            appendClientInfo.set(appendClientInfo.get().withNoAppendClient());
-            APPEND_CLIENTS.put(
-                messageConverters.getAppendClientKey(element.getKey()), 
appendClientInfo.get());
-            for (AppendRowsContext<DestinationT> context : contexts) {
-              if (context.client != null) {
-                // Unpin in a different thread, as it may execute a blocking 
close.
-                runAsyncIgnoreFailure(closeWriterExecutor, 
context.client::unpin);
-                context.client = null;
-              }
-            }
-          };
-
-      Function<AppendRowsContext<DestinationT>, ApiFuture<AppendRowsResponse>> 
runOperation =
-          context -> {
-            if (context.protoRows.getSerializedRowsCount() == 0) {
-              // This might happen if all rows in a batch failed and were sent 
to the failed-rows
-              // PCollection.
-              return 
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
-            }
-            try {
-              appendClientInfo.set(
-                  appendClientInfo
-                      .get()
-                      .withAppendClient(
-                          writeStreamService,
-                          getOrCreateStream,
-                          false,
-                          defaultMissingValueInterpretation));
-              return 
Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient())
-                  .appendRows(context.offset, context.protoRows);
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          };
+        TableSchema updatedSchemaValue = autoUpdateSchema ? 
updatedSchema.read() : null;
+        if (autoUpdateSchema && updatedSchemaValue != null) {
+          if (appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) {
+            appendClientHolder.invalidateAndReset();
+          }
+        }

Review Comment:
   done



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -487,6 +446,46 @@ public void onTeardown() {
       }
     }
 
+    // Holder for an AppendClientHolder. Maintains a pin on the client as long 
as it's active.
+    private class AppendClientHolder implements AutoCloseable {

Review Comment:
   Correct. Keeping one pin for the duration of processElements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to