This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa3cd387a0 SpannerIO: parameterizing partitionQuery timeout (#25236)
7fa3cd387a0 is described below

commit 7fa3cd387a09b10ab4159239825830f8683d44cc
Author: darshan-sj <[email protected]>
AuthorDate: Wed Feb 15 22:19:25 2023 +0530

    SpannerIO: parameterizing partitionQuery timeout (#25236)
    
    * SpannerIO: parameterizing partitionQuery timeout
    
    * SpannerIO: parameterizing partitionRead timeout
    
    * Removing default values of timeouts
    
    * formatting changes
---
 .../beam/sdk/io/gcp/spanner/SpannerAccessor.java   | 22 ++++++
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java     | 28 ++++++++
 .../beam/sdk/io/gcp/spanner/SpannerReadIT.java     | 80 ++++++++++++++++++++++
 3 files changed, 130 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
index 2a277722cc1..619b198bbdd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java
@@ -30,6 +30,7 @@ import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
+import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
 import com.google.spanner.v1.CommitRequest;
 import com.google.spanner.v1.CommitResponse;
 import com.google.spanner.v1.ExecuteSqlRequest;
@@ -162,6 +163,27 @@ public class SpannerAccessor implements AutoCloseable {
               .build());
     }
 
+    SpannerStubSettings.Builder spannerStubSettingsBuilder =
+        builder.getSpannerStubSettingsBuilder();
+    ValueProvider<Duration> partitionQueryTimeout = 
spannerConfig.getPartitionQueryTimeout();
+    if (partitionQueryTimeout != null
+        && partitionQueryTimeout.get() != null
+        && partitionQueryTimeout.get().getMillis() > 0) {
+      spannerStubSettingsBuilder
+          .partitionQuerySettings()
+          .setSimpleTimeoutNoRetries(
+              
org.threeten.bp.Duration.ofMillis(partitionQueryTimeout.get().getMillis()));
+    }
+    ValueProvider<Duration> partitionReadTimeout = 
spannerConfig.getPartitionReadTimeout();
+    if (partitionReadTimeout != null
+        && partitionReadTimeout.get() != null
+        && partitionReadTimeout.get().getMillis() > 0) {
+      spannerStubSettingsBuilder
+          .partitionReadSettings()
+          .setSimpleTimeoutNoRetries(
+              
org.threeten.bp.Duration.ofMillis(partitionReadTimeout.get().getMillis()));
+    }
+
     ValueProvider<String> projectId = spannerConfig.getProjectId();
     if (projectId != null) {
       builder.setProjectId(projectId.get());
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
index 8bf6cbb6143..c31f5aaf834 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -75,6 +75,10 @@ public abstract class SpannerConfig implements Serializable {
 
   public abstract @Nullable ValueProvider<String> getDatabaseRole();
 
+  public abstract @Nullable ValueProvider<Duration> getPartitionQueryTimeout();
+
+  public abstract @Nullable ValueProvider<Duration> getPartitionReadTimeout();
+
   @VisibleForTesting
   abstract @Nullable ServiceFactory<Spanner, SpannerOptions> 
getServiceFactory();
 
@@ -149,6 +153,10 @@ public abstract class SpannerConfig implements 
Serializable {
 
     abstract Builder setDatabaseRole(ValueProvider<String> databaseRole);
 
+    abstract Builder setPartitionQueryTimeout(ValueProvider<Duration> 
partitionQueryTimeout);
+
+    abstract Builder setPartitionReadTimeout(ValueProvider<Duration> 
partitionReadTimeout);
+
     public abstract SpannerConfig build();
   }
 
@@ -265,4 +273,24 @@ public abstract class SpannerConfig implements 
Serializable {
   public SpannerConfig withDatabaseRole(ValueProvider<String> databaseRole) {
     return toBuilder().setDatabaseRole(databaseRole).build();
   }
+
+  /** Specifies the PartitionQuery timeout. */
+  public SpannerConfig withPartitionQueryTimeout(Duration 
partitionQueryTimeout) {
+    return 
withPartitionQueryTimeout(ValueProvider.StaticValueProvider.of(partitionQueryTimeout));
+  }
+
+  /** Specifies the PartitionQuery timeout. */
+  public SpannerConfig withPartitionQueryTimeout(ValueProvider<Duration> 
partitionQueryTimeout) {
+    return toBuilder().setPartitionQueryTimeout(partitionQueryTimeout).build();
+  }
+
+  /** Specifies the PartitionRead timeout. */
+  public SpannerConfig withPartitionReadTimeout(Duration partitionReadTimeout) 
{
+    return 
withPartitionReadTimeout(ValueProvider.StaticValueProvider.of(partitionReadTimeout));
+  }
+
+  /** Specifies the PartitionRead timeout. */
+  public SpannerConfig withPartitionReadTimeout(ValueProvider<Duration> 
partitionReadTimeout) {
+    return toBuilder().setPartitionReadTimeout(partitionReadTimeout).build();
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
index e1ba6a43c0c..7bab745b5b3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -292,6 +294,84 @@ public class SpannerReadIT {
     p.run();
   }
 
+  @Test
+  public void testQueryWithTimeoutError() throws Exception {
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+    SpannerConfig spannerConfig = createSpannerConfig();
+    spannerConfig =
+        
spannerConfig.withPartitionQueryTimeout(StaticValueProvider.of(Duration.millis(1)));
+
+    PCollectionView<Transaction> tx =
+        p.apply(
+            "Create tx",
+            SpannerIO.createTransaction()
+                .withSpannerConfig(spannerConfig)
+                .withTimestampBound(TimestampBound.strong()));
+
+    p.apply(
+        "Read db",
+        SpannerIO.read()
+            .withSpannerConfig(spannerConfig)
+            .withQuery("SELECT * FROM " + options.getTable())
+            .withTransaction(tx));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testQueryWithTimeoutErrorPG() throws Exception {
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+    SpannerConfig pgSpannerConfig = createPgSpannerConfig();
+    pgSpannerConfig =
+        
pgSpannerConfig.withPartitionQueryTimeout(StaticValueProvider.of(Duration.millis(1)));
+
+    PCollectionView<Transaction> pgTx =
+        p.apply(
+            "Create PG tx",
+            SpannerIO.createTransaction()
+                .withSpannerConfig(pgSpannerConfig)
+                .withTimestampBound(TimestampBound.strong()));
+
+    p.apply(
+        "Read PG db",
+        SpannerIO.read()
+            .withSpannerConfig(pgSpannerConfig)
+            .withQuery("SELECT * FROM " + options.getTable())
+            .withTransaction(pgTx));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testReadWithTimeoutError() throws Exception {
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("SpannerException"));
+    thrown.expect(new 
SpannerWriteIT.StackTraceContainsString("DEADLINE_EXCEEDED"));
+
+    SpannerConfig spannerConfig = createSpannerConfig();
+    spannerConfig = spannerConfig.withPartitionReadTimeout(Duration.millis(1));
+
+    PCollectionView<Transaction> tx =
+        p.apply(
+            "Create tx",
+            SpannerIO.createTransaction()
+                .withSpannerConfig(spannerConfig)
+                .withTimestampBound(TimestampBound.strong()));
+
+    p.apply(
+        "read db",
+        SpannerIO.read()
+            .withSpannerConfig(spannerConfig)
+            .withTable(options.getTable())
+            .withColumns("Key", "Value")
+            .withTransaction(tx));
+
+    p.run().waitUntilFinish();
+  }
+
   @Test
   public void testReadAllRecordsInDb() throws Exception {
     SpannerConfig spannerConfig = createSpannerConfig();

Reply via email to