This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4d429dde47b Add options to control number of Storage API connections
when using multiplexing (#31721)
4d429dde47b is described below
commit 4d429dde47b570ccabba6b86173eb2546f76a5f2
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Fri Jul 12 14:46:37 2024 -0400
Add options to control number of Storage API connections when using
multiplexing (#31721)
* add options to set min and max connections to connection management pool;
rename counter to be more accurate
* add multiplexing description
* add to CHANGES.md
* clarify documentation and address comments
* adjust description
* add details
---
CHANGES.md | 2 ++
.../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 8 +++----
.../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 27 ++++++++++++++++++++++
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 9 ++++++++
4 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 96c436d89ec..fc94877a2bb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
## New Features / Improvements
* Multiple RunInference instances can now share the same model instance by
setting the model_identifier parameter (Python)
([#31665](https://github.com/apache/beam/issues/31665)).
+* Added options to control the number of Storage API multiplexing connections
([#31721](https://github.com/apache/beam/pull/31721))
* [IcebergIO] All specified catalog properties are passed through to the
connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK
([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with
Streaming Engine (Java)
([[#18200](https://github.com/apache/beam/issues/18200)])
@@ -82,6 +83,7 @@
## Bugfixes
+* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted
concurrent connections quota
([#31710](https://github.com/apache/beam/pull/31710))
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Security Fixes
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 5a12e81ea79..7505f77fb5b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.metrics.Metrics;
*/
@AutoValue
abstract class AppendClientInfo {
- private final Counter activeConnections =
- Metrics.counter(AppendClientInfo.class, "activeConnections");
+ private final Counter activeStreamAppendClients =
+ Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");
abstract @Nullable BigQueryServices.StreamAppendClient
getStreamAppendClient();
@@ -123,7 +123,7 @@ abstract class AppendClientInfo {
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool,
missingValueInterpretation);
- activeConnections.inc();
+ activeStreamAppendClients.inc();
return
toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
@@ -133,7 +133,7 @@ abstract class AppendClientInfo {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
- activeConnections.dec();
+ activeStreamAppendClients.dec();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index cd1fc6d3842..ba76f483f77 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -109,6 +109,28 @@ public interface BigQueryOptions
void setNumStorageWriteApiStreamAppendClients(Integer value);
+ @Description(
+ "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing
(ie. useStorageApiConnectionPool=true), "
+ + "this option sets the minimum number of connections each pool
creates before any connections are shared. This is "
+ + "on a per worker, per region basis. Note that in practice, the
minimum number of connections created is the minimum "
+ + "of this value and (numStorageWriteApiStreamAppendClients x num
destinations). BigQuery will create this many "
+ + "connections at first and will only create more connections if the
current ones are \"overwhelmed\". Consider "
+ + "increasing this value if you are running into performance
issues.")
+ @Default.Integer(2)
+ Integer getMinConnectionPoolConnections();
+
+ void setMinConnectionPoolConnections(Integer value);
+
+ @Description(
+ "When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing
(ie. useStorageApiConnectionPool=true), "
+ + "this option sets the maximum number of connections each pool
creates. This is on a per worker, per region basis. "
+ + "If writing to many dynamic destinations (>20) and experiencing
performance issues or seeing append operations competing"
+ + "for streams, consider increasing this value.")
+ @Default.Integer(20)
+ Integer getMaxConnectionPoolConnections();
+
+ void setMaxConnectionPoolConnections(Integer value);
+
@Description("The max number of messages inflight that we expect each
connection will retain.")
@Default.Long(1000)
Long getStorageWriteMaxInflightRequests();
@@ -122,6 +144,11 @@ public interface BigQueryOptions
void setStorageWriteMaxInflightBytes(Long value);
+ @Description(
+ "Enables multiplexing mode, where multiple tables can share the same
connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ + " mode. This is recommended if your write operation is creating
20+ connections. When using multiplexing, consider tuning "
+ + "the number of connections created by the connection pool with
minConnectionPoolConnections and maxConnectionPoolConnections. "
+ + "For more information, see
https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management")
@Default.Boolean(false)
Boolean getUseStorageApiConnectionPool();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 2bdba0b053c..c6b0e17e59d 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -69,6 +69,7 @@ import
com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
@@ -1423,6 +1424,14 @@ public class BigQueryServicesImpl implements
BigQueryServices {
bqIOMetadata.getBeamJobId() == null ? "" :
bqIOMetadata.getBeamJobId(),
bqIOMetadata.getBeamWorkerId() == null ? "" :
bqIOMetadata.getBeamWorkerId());
+ ConnectionWorkerPool.setOptions(
+ ConnectionWorkerPool.Settings.builder()
+ .setMinConnectionsPerRegion(
+
options.as(BigQueryOptions.class).getMinConnectionPoolConnections())
+ .setMaxConnectionsPerRegion(
+
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
+ .build());
+
StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(