This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 20aa916931f Properly close Storage API batch connections (#31710)
20aa916931f is described below

commit 20aa916931f70d3bc75c5a22bc70bed0099c549e
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jun 28 15:52:06 2024 -0400

    Properly close Storage API batch connections (#31710)
    
    * properly close connections; add active connection counter
    
    * only invalidate stream at teardown for PENDING type
    
    * cleanup
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java    | 19 +++++++++++++------
 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java |  5 +++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 211027c12b0..5a12e81ea79 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -30,6 +30,8 @@ import com.google.protobuf.Message;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 
 /**
  * Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -38,6 +40,9 @@ import javax.annotation.Nullable;
  */
 @AutoValue
 abstract class AppendClientInfo {
+  private final Counter activeConnections =
+      Metrics.counter(AppendClientInfo.class, "activeConnections");
+
   abstract @Nullable BigQueryServices.StreamAppendClient 
getStreamAppendClient();
 
   abstract TableSchema getTableSchema();
@@ -114,12 +119,13 @@ abstract class AppendClientInfo {
       return this;
     } else {
       String streamName = getStreamName.get();
-      return toBuilder()
-          .setStreamName(streamName)
-          .setStreamAppendClient(
-              writeStreamService.getStreamAppendClient(
-                  streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation))
-          .build();
+      BigQueryServices.StreamAppendClient client =
+          writeStreamService.getStreamAppendClient(
+              streamName, getDescriptor(), useConnectionPool, 
missingValueInterpretation);
+
+      activeConnections.inc();
+
+      return 
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
     }
   }
 
@@ -127,6 +133,7 @@ abstract class AppendClientInfo {
     BigQueryServices.StreamAppendClient client = getStreamAppendClient();
     if (client != null) {
       getCloseAppendClient().accept(client);
+      activeConnections.dec();
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8af88f8f7dc..ce5e7b4854e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -334,6 +334,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
           if (client != null) {
             runAsyncIgnoreFailure(closeWriterExecutor, client::unpin);
           }
+          // if this is a PENDING stream, we won't be using it again after 
cleaning up this
+          // destination state, so clear it from the cache
+          if (!useDefaultStream) {
+            APPEND_CLIENTS.invalidate(streamName);
+          }
           appendClientInfo = null;
         }
       }

Reply via email to