This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ae6d624c2b6 Add a configuration to wait for session creation (#37625)
ae6d624c2b6 is described below
commit ae6d624c2b6d1fed15e4e96bf40e707808ad652b
Author: Sakthivel Subramanian
<[email protected]>
AuthorDate: Wed Mar 18 19:57:57 2026 +0530
Add a configuration to wait for session creation (#37625)
* Add a configuration to wait for session creation
* Address spotless comments
* Add session wait time to tests
* Fix checkstyle issue
* Add an description
* Set wait time to 0
* Increase wait time
* Increase wait time
* Reduce time increment to 5 seconds
* Reduce time increment to 2 seconds
---
.../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 14 ++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerConfig.java | 25 ++++++++++++++++++++++
.../beam/sdk/io/gcp/spanner/SpannerReadIT.java | 5 +++++
.../beam/sdk/io/gcp/spanner/SpannerWriteIT.java | 5 +++++
.../changestreams/it/IntegrationTestEnv.java | 5 +++++
5 files changed, 54 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index a3777099bfc..eb1860902c6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -30,6 +30,7 @@ import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
@@ -38,6 +39,7 @@ import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.options.ValueProvider;
@@ -61,6 +63,9 @@ public class SpannerAccessor implements AutoCloseable {
*/
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
+ // Default wait time for session creation
+ static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION =
java.time.Duration.ofMinutes(5);
+
/** Instance ID to use when connecting to an experimental host. */
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";
@@ -270,6 +275,15 @@ public class SpannerAccessor implements AutoCloseable {
builder.setCredentials(credentials.get());
}
+ ValueProvider<java.time.Duration> waitForSessionCreationDuration =
+ spannerConfig.getWaitForSessionCreationDuration();
+ java.time.Duration waitDuration =
+ Optional.ofNullable(waitForSessionCreationDuration)
+ .map(ValueProvider::get)
+ .orElse(DEFAULT_SESSION_WAIT_DURATION);
+ builder.setSessionPoolOption(
+
SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build());
+
return builder.build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index f52b8378cb6..a17c851f38a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -104,6 +104,8 @@ public abstract class SpannerConfig implements Serializable
{
public abstract @Nullable ValueProvider<Credentials> getCredentials();
+ public abstract @Nullable ValueProvider<java.time.Duration>
getWaitForSessionCreationDuration();
+
abstract Builder toBuilder();
public static SpannerConfig create() {
@@ -189,6 +191,9 @@ public abstract class SpannerConfig implements Serializable
{
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
+ abstract Builder setWaitForSessionCreationDuration(
+ ValueProvider<java.time.Duration> waitForSessionCreationDuration);
+
public abstract SpannerConfig build();
}
@@ -389,4 +394,24 @@ public abstract class SpannerConfig implements
Serializable {
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
return
withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}
+
+ /**
+ * Sets the wait time for a multiplexed session to be available when
creating a database client.
+ *
+ * <p>Setting this will block the {@link
com.google.cloud.spanner.DatabaseClient} creation.
+ *
+ * @param waitForSessionCreationDuration The duration to wait. Defaults to
{@link
+ * SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}.
+ * @return {@link SpannerConfig}
+ */
+ public SpannerConfig withWaitForSessionCreationDuration(
+ ValueProvider<java.time.Duration> waitForSessionCreationDuration) {
+ return
toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build();
+ }
+
+ public SpannerConfig withWaitForSessionCreationDuration(
+ java.time.Duration waitForSessionCreationDuration) {
+ return withWaitForSessionCreationDuration(
+ ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
index 512fd5998dd..34c839d3e1e 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
@@ -26,6 +26,7 @@ import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
@@ -118,6 +119,10 @@ public class SpannerReadIT {
SpannerOptions.newBuilder()
.setProjectId(project)
.disableGrpcGcpExtension()
+ .setSessionPoolOption(
+ SessionPoolOptions.newBuilder()
+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
+ .build())
.build()
.getService();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
index 91fe3473be0..df23435d82a 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java
@@ -28,6 +28,7 @@ import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
@@ -125,6 +126,10 @@ public class SpannerWriteIT {
SpannerOptions.newBuilder()
.setProjectId(project)
.disableGrpcGcpExtension()
+ .setSessionPoolOption(
+ SessionPoolOptions.newBuilder()
+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
+ .build())
.build()
.getService();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
index 581b7f3cc2f..b0ce19b9f00 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java
@@ -21,6 +21,7 @@ import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Dialect;
+import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import java.util.ArrayList;
@@ -81,6 +82,10 @@ public class IntegrationTestEnv extends ExternalResource {
.setProjectId(projectId)
.setHost(host)
.disableGrpcGcpExtension()
+ .setSessionPoolOption(
+ SessionPoolOptions.newBuilder()
+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
+ .build())
.build()
.getService();
databaseAdminClient = spanner.getDatabaseAdminClient();