This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5f298  [BEAM-12504] Make CreateTransaction wait on input signal
     new 6774062  Merge pull request #15048 from [BEAM-12504] Make 
SpannerIO.ConnectTransaction wait on input signal
6b5f298 is described below

commit 6b5f298dfce8d8c7fc5def3a57b033a95016ca52
Author: Minbo Bae <[email protected]>
AuthorDate: Sun Jun 20 23:28:17 2021 -0700

    [BEAM-12504] Make CreateTransaction wait on input signal
    
    The current SpannerIO creates transaction at the beginning of pipeline run. 
This causes SpannerIO error by session not found if the pipeline runs long with 
`ReadAll` which gets `ReadOperation` as input .
    
    This change introduces a delayed transaction creation. It makes a pipeline 
able to create Spanner transaction when it is needed (e.g. just before ReadAll).
    
    ```
    SpannerConfig config = ...
    
    PCollection<ReadOperation> readOperations = ...
    
    // Transaction will be created after readOperations is ready. So, it can 
avoid session expiration error.
    PCollectionView<Transaction> transaction = 
readOperations.apply(SpannerIO.createTransaction());
    
    PCollection<Struct> users = readOperations
      
.apply(SpannerIO.readAll().withSpannerConfig(config).withTransaction(transaction))
    ```
---
 .../sdk/io/gcp/spanner/CreateTransactionFn.java    | 12 ++++++-----
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  | 24 ++++++++++++++++------
 2 files changed, 25 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
index cff3fb0..cf26985 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.spanner;
 
 import com.google.cloud.spanner.BatchReadOnlyTransaction;
+import com.google.cloud.spanner.TimestampBound;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /** Creates a batch transaction. */
@@ -25,18 +26,19 @@ import org.apache.beam.sdk.transforms.DoFn;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 class CreateTransactionFn extends DoFn<Object, Transaction> {
+  private final SpannerConfig config;
+  private final TimestampBound timestampBound;
 
-  private final SpannerIO.CreateTransaction config;
-
-  CreateTransactionFn(SpannerIO.CreateTransaction config) {
+  CreateTransactionFn(SpannerConfig config, TimestampBound timestampBound) {
     this.config = config;
+    this.timestampBound = timestampBound;
   }
 
   private transient SpannerAccessor spannerAccessor;
 
   @DoFn.Setup
   public void setup() throws Exception {
-    spannerAccessor = SpannerAccessor.getOrCreate(config.getSpannerConfig());
+    spannerAccessor = SpannerAccessor.getOrCreate(config);
   }
 
   @Teardown
@@ -47,7 +49,7 @@ class CreateTransactionFn extends DoFn<Object, Transaction> {
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     BatchReadOnlyTransaction tx =
-        
spannerAccessor.getBatchClient().batchReadOnlyTransaction(config.getTimestampBound());
+        
spannerAccessor.getBatchClient().batchReadOnlyTransaction(timestampBound);
     c.output(Transaction.create(tx.getBatchTransactionId()));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index b31d716..07ff216 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -739,13 +740,15 @@ public class SpannerIO {
   }
 
   /**
-   * A {@link PTransform} that create a transaction.
+   * A {@link PTransform} that create a transaction. If applied to a {@link 
PCollection}, it will
+   * create a transaction after the {@link PCollection} is closed.
    *
    * @see SpannerIO
+   * @see Wait
    */
   @AutoValue
   public abstract static class CreateTransaction
-      extends PTransform<PBegin, PCollectionView<Transaction>> {
+      extends PTransform<PInput, PCollectionView<Transaction>> {
 
     abstract SpannerConfig getSpannerConfig();
 
@@ -754,12 +757,21 @@ public class SpannerIO {
     abstract Builder toBuilder();
 
     @Override
-    public PCollectionView<Transaction> expand(PBegin input) {
+    public PCollectionView<Transaction> expand(PInput input) {
       getSpannerConfig().validate();
 
-      return input
-          .apply(Create.of(1))
-          .apply("Create transaction", ParDo.of(new CreateTransactionFn(this)))
+      PCollection<?> collection = input.getPipeline().apply(Create.of(1));
+
+      if (input instanceof PCollection) {
+        collection = collection.apply(Wait.on((PCollection<?>) input));
+      } else if (!(input instanceof PBegin)) {
+        throw new RuntimeException("input must be PBegin or PCollection");
+      }
+
+      return collection
+          .apply(
+              "Create transaction",
+              ParDo.of(new CreateTransactionFn(this.getSpannerConfig(), 
this.getTimestampBound())))
           .apply("As PCollectionView", View.asSingleton());
     }
 

Reply via email to