This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 20aa916931f Properly close Storage API batch connections (#31710)
20aa916931f is described below
commit 20aa916931f70d3bc75c5a22bc70bed0099c549e
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jun 28 15:52:06 2024 -0400
Properly close Storage API batch connections (#31710)
* properly close connections; add active connection counter
* only invalidate stream at teardown for PENDING type
* cleanup
---
.../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 19 +++++++++++++------
.../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 5 +++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 211027c12b0..5a12e81ea79 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -30,6 +30,8 @@ import com.google.protobuf.Message;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
/**
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -38,6 +40,9 @@ import javax.annotation.Nullable;
*/
@AutoValue
abstract class AppendClientInfo {
+ private final Counter activeConnections =
+ Metrics.counter(AppendClientInfo.class, "activeConnections");
+
abstract @Nullable BigQueryServices.StreamAppendClient
getStreamAppendClient();
abstract TableSchema getTableSchema();
@@ -114,12 +119,13 @@ abstract class AppendClientInfo {
return this;
} else {
String streamName = getStreamName.get();
- return toBuilder()
- .setStreamName(streamName)
- .setStreamAppendClient(
- writeStreamService.getStreamAppendClient(
- streamName, getDescriptor(), useConnectionPool,
missingValueInterpretation))
- .build();
+ BigQueryServices.StreamAppendClient client =
+ writeStreamService.getStreamAppendClient(
+ streamName, getDescriptor(), useConnectionPool,
missingValueInterpretation);
+
+ activeConnections.inc();
+
+ return
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
}
@@ -127,6 +133,7 @@ abstract class AppendClientInfo {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
+ activeConnections.dec();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8af88f8f7dc..ce5e7b4854e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -334,6 +334,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
if (client != null) {
runAsyncIgnoreFailure(closeWriterExecutor, client::unpin);
}
+ // if this is a PENDING stream, we won't be using it again after
cleaning up this
+ // destination state, so clear it from the cache
+ if (!useDefaultStream) {
+ APPEND_CLIENTS.invalidate(streamName);
+ }
appendClientInfo = null;
}
}