This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7fa3cd387a0 SpannerIO: parameterizing partitionQuery timeout (#25236)
7fa3cd387a0 is described below
commit 7fa3cd387a09b10ab4159239825830f8683d44cc
Author: darshan-sj <[email protected]>
AuthorDate: Wed Feb 15 22:19:25 2023 +0530
SpannerIO: parameterizing partitionQuery timeout (#25236)
* SpannerIO: parameterizing partitionQuery timeout
* SpannerIO: parameterizing partitionRead timeout
* Removing default values of timeouts
* formatting changes
---
.../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 22 ++++++
.../beam/sdk/io/gcp/spanner/SpannerConfig.java | 28 ++++++++
.../beam/sdk/io/gcp/spanner/SpannerReadIT.java | 80 ++++++++++++++++++++++
3 files changed, 130 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 2a277722cc1..619b198bbdd 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.ExecuteSqlRequest;
@@ -162,6 +163,27 @@ public class SpannerAccessor implements AutoCloseable {
.build());
}
+ SpannerStubSettings.Builder spannerStubSettingsBuilder =
+ builder.getSpannerStubSettingsBuilder();
+ ValueProvider<Duration> partitionQueryTimeout =
spannerConfig.getPartitionQueryTimeout();
+ if (partitionQueryTimeout != null
+ && partitionQueryTimeout.get() != null
+ && partitionQueryTimeout.get().getMillis() > 0) {
+ spannerStubSettingsBuilder
+ .partitionQuerySettings()
+ .setSimpleTimeoutNoRetries(
+
org.threeten.bp.Duration.ofMillis(partitionQueryTimeout.get().getMillis()));
+ }
+ ValueProvider<Duration> partitionReadTimeout =
spannerConfig.getPartitionReadTimeout();
+ if (partitionReadTimeout != null
+ && partitionReadTimeout.get() != null
+ && partitionReadTimeout.get().getMillis() > 0) {
+ spannerStubSettingsBuilder
+ .partitionReadSettings()
+ .setSimpleTimeoutNoRetries(
+
org.threeten.bp.Duration.ofMillis(partitionReadTimeout.get().getMillis()));
+ }
+
ValueProvider<String> projectId = spannerConfig.getProjectId();
if (projectId != null) {
builder.setProjectId(projectId.get());
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index 8bf6cbb6143..c31f5aaf834 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -75,6 +75,10 @@ public abstract class SpannerConfig implements Serializable {
public abstract @Nullable ValueProvider<String> getDatabaseRole();
+ public abstract @Nullable ValueProvider<Duration> getPartitionQueryTimeout();
+
+ public abstract @Nullable ValueProvider<Duration> getPartitionReadTimeout();
+
@VisibleForTesting
abstract @Nullable ServiceFactory<Spanner, SpannerOptions>
getServiceFactory();
@@ -149,6 +153,10 @@ public abstract class SpannerConfig implements
Serializable {
abstract Builder setDatabaseRole(ValueProvider<String> databaseRole);
+ abstract Builder setPartitionQueryTimeout(ValueProvider<Duration>
partitionQueryTimeout);
+
+ abstract Builder setPartitionReadTimeout(ValueProvider<Duration>
partitionReadTimeout);
+
public abstract SpannerConfig build();
}
@@ -265,4 +273,24 @@ public abstract class SpannerConfig implements
Serializable {
public SpannerConfig withDatabaseRole(ValueProvider<String> databaseRole) {
return toBuilder().setDatabaseRole(databaseRole).build();
}
+
+ /** Specifies the PartitionQuery timeout. */
+ public SpannerConfig withPartitionQueryTimeout(Duration
partitionQueryTimeout) {
+ return
withPartitionQueryTimeout(ValueProvider.StaticValueProvider.of(partitionQueryTimeout));
+ }
+
+ /** Specifies the PartitionQuery timeout. */
+ public SpannerConfig withPartitionQueryTimeout(ValueProvider<Duration>
partitionQueryTimeout) {
+ return toBuilder().setPartitionQueryTimeout(partitionQueryTimeout).build();
+ }
+
+ /** Specifies the PartitionRead timeout. */
+ public SpannerConfig withPartitionReadTimeout(Duration partitionReadTimeout)
{
+ return
withPartitionReadTimeout(ValueProvider.StaticValueProvider.of(partitionReadTimeout));
+ }
+
+ /** Specifies the PartitionRead timeout. */
+ public SpannerConfig withPartitionReadTimeout(ValueProvider<Duration>
partitionReadTimeout) {
+ return toBuilder().setPartitionReadTimeout(partitionReadTimeout).build();
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
index e1ba6a43c0c..7bab745b5b3 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -292,6 +294,84 @@ public class SpannerReadIT {
p.run();
}
+ @Test
+ public void testQueryWithTimeoutError() throws Exception {
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+ SpannerConfig spannerConfig = createSpannerConfig();
+ spannerConfig =
+
spannerConfig.withPartitionQueryTimeout(StaticValueProvider.of(Duration.millis(1)));
+
+ PCollectionView<Transaction> tx =
+ p.apply(
+ "Create tx",
+ SpannerIO.createTransaction()
+ .withSpannerConfig(spannerConfig)
+ .withTimestampBound(TimestampBound.strong()));
+
+ p.apply(
+ "Read db",
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery("SELECT * FROM " + options.getTable())
+ .withTransaction(tx));
+
+ p.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testQueryWithTimeoutErrorPG() throws Exception {
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+ SpannerConfig pgSpannerConfig = createPgSpannerConfig();
+ pgSpannerConfig =
+
pgSpannerConfig.withPartitionQueryTimeout(StaticValueProvider.of(Duration.millis(1)));
+
+ PCollectionView<Transaction> pgTx =
+ p.apply(
+ "Create PG tx",
+ SpannerIO.createTransaction()
+ .withSpannerConfig(pgSpannerConfig)
+ .withTimestampBound(TimestampBound.strong()));
+
+ p.apply(
+ "Read PG db",
+ SpannerIO.read()
+ .withSpannerConfig(pgSpannerConfig)
+ .withQuery("SELECT * FROM " + options.getTable())
+ .withTransaction(pgTx));
+
+ p.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testReadWithTimeoutError() throws Exception {
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+ thrown.expect(new
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+ SpannerConfig spannerConfig = createSpannerConfig();
+ spannerConfig = spannerConfig.withPartitionReadTimeout(Duration.millis(1));
+
+ PCollectionView<Transaction> tx =
+ p.apply(
+ "Create tx",
+ SpannerIO.createTransaction()
+ .withSpannerConfig(spannerConfig)
+ .withTimestampBound(TimestampBound.strong()));
+
+ p.apply(
+ "read db",
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withTable(options.getTable())
+ .withColumns("Key", "Value")
+ .withTransaction(tx));
+
+ p.run().waitUntilFinish();
+ }
+
@Test
public void testReadAllRecordsInDb() throws Exception {
SpannerConfig spannerConfig = createSpannerConfig();