Repository: beam
Updated Branches:
  refs/heads/master 0b19fb414 -> e5929bd13


Pre read api refactoring. Extract `SpannerConfig` and `AbstractSpannerFn`


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/454f1c42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/454f1c42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/454f1c42

Branch: refs/heads/master
Commit: 454f1c427353feeb858cdc62185ea3fced8d8a1f
Parents: 80c9263
Author: Mairbek Khadikov <mair...@google.com>
Authored: Mon Jun 19 13:01:20 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Tue Jun 27 18:36:01 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/AbstractSpannerFn.java   |  41 ++++
 .../beam/sdk/io/gcp/spanner/SpannerConfig.java  | 118 ++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 227 ++++---------------
 .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 108 +++++++++
 .../beam/sdk/io/gcp/spanner/SpannerIOTest.java  |   8 +-
 5 files changed, 321 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
new file mode 100644
index 0000000..08f7fa9
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
@@ -0,0 +1,41 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link
+ * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database 
client.
+ */
+abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, 
OutputT> {
+  private transient Spanner spanner;
+  private transient DatabaseClient databaseClient;
+
+  abstract SpannerConfig getSpannerConfig();
+
+  @Setup
+  public void setup() throws Exception {
+    SpannerConfig spannerConfig = getSpannerConfig();
+    SpannerOptions options = spannerConfig.buildSpannerOptions();
+    spanner = options.getService();
+    databaseClient = spanner.getDatabaseClient(DatabaseId
+        .of(options.getProjectId(), spannerConfig.getInstanceId().get(),
+            spannerConfig.getDatabaseId().get()));
+  }
+
+  @Teardown
+  public void teardown() throws Exception {
+    if (spanner == null) {
+      return;
+    }
+    spanner.close();
+    spanner = null;
+  }
+
+  protected DatabaseClient databaseClient() {
+    return databaseClient;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
new file mode 100644
index 0000000..4cb8aa2
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
@@ -0,0 +1,118 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.ServiceFactory;
+import com.google.cloud.spanner.Spanner;
+import com.google.cloud.spanner.SpannerOptions;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Configuration for a Cloud Spanner client. */
+@AutoValue
+public abstract class SpannerConfig implements Serializable {
+
+  private static final long serialVersionUID = -5680874609304170301L;
+
+  @Nullable
+  abstract ValueProvider<String> getProjectId();
+
+  @Nullable
+  abstract ValueProvider<String> getInstanceId();
+
+  @Nullable
+  abstract ValueProvider<String> getDatabaseId();
+
+  @Nullable
+  @VisibleForTesting
+  abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
+
+  abstract Builder toBuilder();
+
+  SpannerOptions buildSpannerOptions() {
+    SpannerOptions.Builder builder = SpannerOptions.newBuilder();
+    if (getProjectId() != null) {
+      builder.setProjectId(getProjectId().get());
+    }
+    if (getServiceFactory() != null) {
+      builder.setServiceFactory(getServiceFactory());
+    }
+    return builder.build();
+  }
+
+  public static SpannerConfig create() {
+    return builder().build();
+  }
+
+  public static Builder builder() {
+    return new AutoValue_SpannerConfig.Builder();
+  }
+
+  public void validate(PipelineOptions options) {
+    checkNotNull(
+        getInstanceId(),
+        "SpannerIO.read() requires instance id to be set with withInstanceId 
method");
+    checkNotNull(
+        getDatabaseId(),
+        "SpannerIO.read() requires database id to be set with withDatabaseId 
method");
+  }
+
+  public void populateDisplayData(DisplayData.Builder builder) {
+    builder
+        .addIfNotNull(DisplayData.item("projectId", 
getProjectId()).withLabel("Output Project"))
+        .addIfNotNull(DisplayData.item("instanceId", 
getInstanceId()).withLabel("Output Instance"))
+        .addIfNotNull(DisplayData.item("databaseId", 
getDatabaseId()).withLabel("Output Database"));
+
+    if (getServiceFactory() != null) {
+      builder.addIfNotNull(
+          DisplayData.item("serviceFactory", 
getServiceFactory().getClass().getName())
+              .withLabel("Service Factory"));
+    }
+  }
+
+  /** Builder for {@link SpannerConfig}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+
+    abstract Builder setProjectId(ValueProvider<String> projectId);
+
+    abstract Builder setInstanceId(ValueProvider<String> instanceId);
+
+    abstract Builder setDatabaseId(ValueProvider<String> databaseId);
+
+
+    abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory);
+
+    public abstract SpannerConfig build();
+  }
+
+  public SpannerConfig withProjectId(ValueProvider<String> projectId) {
+    return toBuilder().setProjectId(projectId).build();
+  }
+
+  public SpannerConfig withProjectId(String projectId) {
+    return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+  }
+
+  public SpannerConfig withInstanceId(ValueProvider<String> instanceId) {
+    return toBuilder().setInstanceId(instanceId).build();
+  }
+
+  public SpannerConfig withInstanceId(String instanceId) {
+    return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+  }
+
+  public SpannerConfig withDatabaseId(ValueProvider<String> databaseId) {
+    return toBuilder().setDatabaseId(databaseId).build();
+  }
+
+  public SpannerConfig withDatabaseId(String databaseId) {
+    return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 32bf1d0..791c7e7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -17,22 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.auto.value.AutoValue;
 import com.google.cloud.ServiceFactory;
-import com.google.cloud.ServiceOptions;
-import com.google.cloud.spanner.AbortedException;
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.annotations.Experimental;
@@ -42,16 +33,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Experimental {@link PTransform Transforms} for reading from and writing to 
<a
@@ -123,37 +106,23 @@ public class SpannerIO {
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<Mutation>, 
PDone> {
 
-    @Nullable
-    abstract ValueProvider<String> getProjectId();
+    private static final long serialVersionUID = 1920175411827980145L;
 
-    @Nullable
-    abstract ValueProvider<String> getInstanceId();
-
-    @Nullable
-    abstract ValueProvider<String> getDatabaseId();
+    abstract SpannerConfig getSpannerConfig();
 
     abstract long getBatchSizeBytes();
 
-    @Nullable
-    @VisibleForTesting
-    abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
-
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
 
-      abstract Builder setProjectId(ValueProvider<String> projectId);
-
-      abstract Builder setInstanceId(ValueProvider<String> instanceId);
+      abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
 
-      abstract Builder setDatabaseId(ValueProvider<String> databaseId);
+      abstract SpannerConfig.Builder spannerConfigBuilder();
 
       abstract Builder setBatchSizeBytes(long batchSizeBytes);
 
-      @VisibleForTesting
-      abstract Builder setServiceFactory(ServiceFactory<Spanner, 
SpannerOptions> serviceFactory);
-
       abstract Write build();
     }
 
@@ -166,8 +135,15 @@ public class SpannerIO {
       return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
     }
 
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner project.
+     *
+     * <p>Does not modify this object.
+     */
     public Write withProjectId(ValueProvider<String> projectId) {
-      return toBuilder().setProjectId(projectId).build();
+      Write.Builder builder = toBuilder();
+      builder.spannerConfigBuilder().setProjectId(projectId);
+      return builder.build();
     }
 
     /**
@@ -180,11 +156,30 @@ public class SpannerIO {
       return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
     }
 
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner
+     * instance.
+     *
+     * <p>Does not modify this object.
+     */
     public Write withInstanceId(ValueProvider<String> instanceId) {
-      return toBuilder().setInstanceId(instanceId).build();
+      Write.Builder builder = toBuilder();
+      builder.spannerConfigBuilder().setInstanceId(instanceId);
+      return builder.build();
     }
 
     /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner
+     * config.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withSpannerConfig(SpannerConfig spannerConfig) {
+      return toBuilder().setSpannerConfig(spannerConfig).build();
+    }
+
+
+    /**
      * Returns a new {@link SpannerIO.Write} with a new batch size limit.
      *
      * <p>Does not modify this object.
@@ -203,8 +198,16 @@ public class SpannerIO {
       return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
     }
 
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner
+     * database.
+     *
+     * <p>Does not modify this object.
+     */
     public Write withDatabaseId(ValueProvider<String> databaseId) {
-      return toBuilder().setDatabaseId(databaseId).build();
+      Write.Builder builder = toBuilder();
+      builder.spannerConfigBuilder().setDatabaseId(databaseId);
+      return builder.build();
     }
 
     /**
@@ -216,17 +219,14 @@ public class SpannerIO {
 
     @VisibleForTesting
     Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory) {
-      return toBuilder().setServiceFactory(serviceFactory).build();
+      Write.Builder builder = toBuilder();
+      builder.spannerConfigBuilder().setServiceFactory(serviceFactory);
+      return builder.build();
     }
 
     @Override
     public void validate(PipelineOptions options) {
-      checkNotNull(
-          getInstanceId(),
-          "SpannerIO.write() requires instance id to be set with 
withInstanceId method");
-      checkNotNull(
-          getDatabaseId(),
-          "SpannerIO.write() requires database id to be set with 
withDatabaseId method");
+      getSpannerConfig().validate(options);
     }
 
     @Override
@@ -237,22 +237,13 @@ public class SpannerIO {
       return PDone.in(input.getPipeline());
     }
 
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", 
getProjectId()).withLabel("Output Project"))
-          .addIfNotNull(
-              DisplayData.item("instanceId", 
getInstanceId()).withLabel("Output Instance"))
-          .addIfNotNull(
-              DisplayData.item("databaseId", 
getDatabaseId()).withLabel("Output Database"))
-          .add(DisplayData.item("batchSizeBytes", getBatchSizeBytes())
-              .withLabel("Batch Size in Bytes"));
-      if (getServiceFactory() != null) {
-        builder.addIfNotNull(
-            DisplayData.item("serviceFactory", 
getServiceFactory().getClass().getName())
-                .withLabel("Service Factory"));
-      }
+      getSpannerConfig().populateDisplayData(builder);
+      builder.add(
+          DisplayData.item("batchSizeBytes", 
getBatchSizeBytes()).withLabel("Batch Size in Bytes"));
     }
   }
 
@@ -278,123 +269,5 @@ public class SpannerIO {
     }
   }
 
-  /** Batches together and writes mutations to Google Cloud Spanner. */
-  @VisibleForTesting
-  static class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SpannerWriteGroupFn.class);
-    private final Write spec;
-    private transient Spanner spanner;
-    private transient DatabaseClient dbClient;
-    // Current batch of mutations to be written.
-    private List<MutationGroup> mutations;
-    private long batchSizeBytes = 0;
-
-    private static final int MAX_RETRIES = 5;
-    private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
-        FluentBackoff.DEFAULT
-            .withMaxRetries(MAX_RETRIES)
-            .withInitialBackoff(Duration.standardSeconds(5));
-
-    @VisibleForTesting SpannerWriteGroupFn(Write spec) {
-      this.spec = spec;
-    }
-
-    @Setup
-    public void setup() throws Exception {
-      SpannerOptions spannerOptions = getSpannerOptions();
-      spanner = spannerOptions.getService();
-      dbClient = spanner.getDatabaseClient(
-          DatabaseId.of(projectId(), spec.getInstanceId().get(), 
spec.getDatabaseId().get()));
-      mutations = new ArrayList<>();
-      batchSizeBytes = 0;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      MutationGroup m = c.element();
-      mutations.add(m);
-      batchSizeBytes += MutationSizeEstimator.sizeOf(m);
-      if (batchSizeBytes >= spec.getBatchSizeBytes()) {
-        flushBatch();
-      }
-    }
-
-    private String projectId() {
-      return spec.getProjectId() == null
-          ? ServiceOptions.getDefaultProjectId()
-          : spec.getProjectId().get();
-    }
-
-    @FinishBundle
-    public void finishBundle() throws Exception {
-      if (!mutations.isEmpty()) {
-        flushBatch();
-      }
-    }
-
-    @Teardown
-    public void teardown() throws Exception {
-      if (spanner == null) {
-        return;
-      }
-      spanner.close();
-      spanner = null;
-    }
-
-    private SpannerOptions getSpannerOptions() {
-      SpannerOptions.Builder spannerOptionsBuider = 
SpannerOptions.newBuilder();
-      if (spec.getServiceFactory() != null) {
-        spannerOptionsBuider.setServiceFactory(spec.getServiceFactory());
-      }
-      if (spec.getProjectId() != null) {
-        spannerOptionsBuider.setProjectId(spec.getProjectId().get());
-      }
-      return spannerOptionsBuider.build();
-    }
-
-    /**
-     * Writes a batch of mutations to Cloud Spanner.
-     *
-     * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} 
times. If the retry limit
-     * is exceeded, the last exception from Cloud Spanner will be thrown.
-     *
-     * @throws AbortedException if the commit fails or IOException or 
InterruptedException if
-     *     backing off between retries fails.
-     */
-    private void flushBatch() throws AbortedException, IOException, 
InterruptedException {
-      LOG.debug("Writing batch of {} mutations", mutations.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
-
-      while (true) {
-        // Batch upsert rows.
-        try {
-          dbClient.writeAtLeastOnce(Iterables.concat(mutations));
-
-          // Break if the commit threw no exception.
-          break;
-        } catch (AbortedException exception) {
-          // Only log the code and message for potentially-transient errors. 
The entire exception
-          // will be propagated upon the last retry.
-          LOG.error(
-              "Error writing to Spanner ({}): {}", exception.getCode(), 
exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      LOG.debug("Successfully wrote {} mutations", mutations.size());
-      mutations = new ArrayList<>();
-      batchSizeBytes = 0;
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      super.populateDisplayData(builder);
-      spec.populateDisplayData(builder);
-    }
-  }
-
   private SpannerIO() {} // Prevent construction.
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
new file mode 100644
index 0000000..aed4832
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
@@ -0,0 +1,108 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.AbortedException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Batches together and writes mutations to Google Cloud Spanner. */
+@VisibleForTesting class SpannerWriteGroupFn extends 
AbstractSpannerFn<MutationGroup, Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SpannerWriteGroupFn.class);
+  private final SpannerIO.Write spec;
+  // Current batch of mutations to be written.
+  private List<MutationGroup> mutations;
+  private long batchSizeBytes = 0;
+
+  private static final int MAX_RETRIES = 5;
+  private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+      FluentBackoff.DEFAULT
+          .withMaxRetries(MAX_RETRIES)
+          .withInitialBackoff(Duration.standardSeconds(5));
+
+  @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) {
+    this.spec = spec;
+  }
+
+  @Override SpannerConfig getSpannerConfig() {
+    return spec.getSpannerConfig();
+  }
+
+  @Setup
+  public void setup() throws Exception {
+    super.setup();
+    mutations = new ArrayList<>();
+    batchSizeBytes = 0;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    MutationGroup m = c.element();
+    mutations.add(m);
+    batchSizeBytes += MutationSizeEstimator.sizeOf(m);
+    if (batchSizeBytes >= spec.getBatchSizeBytes()) {
+      flushBatch();
+    }
+  }
+
+  @FinishBundle
+  public void finishBundle() throws Exception {
+    if (!mutations.isEmpty()) {
+      flushBatch();
+    }
+  }
+
+  /**
+   * Writes a batch of mutations to Cloud Spanner.
+   *
+   * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} 
times. If the retry limit
+   * is exceeded, the last exception from Cloud Spanner will be thrown.
+   *
+   * @throws AbortedException if the commit fails or IOException or 
InterruptedException if
+   *     backing off between retries fails.
+   */
+  private void flushBatch() throws AbortedException, IOException, 
InterruptedException {
+    LOG.debug("Writing batch of {} mutations", mutations.size());
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+
+    while (true) {
+      // Batch upsert rows.
+      try {
+        databaseClient().writeAtLeastOnce(Iterables.concat(mutations));
+
+        // Break if the commit threw no exception.
+        break;
+      } catch (AbortedException exception) {
+        // Only log the code and message for potentially-transient errors. The 
entire exception
+        // will be propagated upon the last retry.
+        LOG.error(
+            "Error writing to Spanner ({}): {}", exception.getCode(), 
exception.getMessage());
+        if (!BackOffUtils.next(sleeper, backoff)) {
+          LOG.error("Aborting after {} retries.", MAX_RETRIES);
+          throw exception;
+        }
+      }
+    }
+    LOG.debug("Successfully wrote {} mutations", mutations.size());
+    mutations = new ArrayList<>();
+    batchSizeBytes = 0;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    spec.populateDisplayData(builder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/454f1c42/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
index 0cc08bf..abeac0a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java
@@ -149,7 +149,7 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(1000000000)
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteGroupFn writerFn = new 
SpannerIO.SpannerWriteGroupFn(write);
+    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
     DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two));
 
@@ -175,7 +175,7 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(batchSize)
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteGroupFn writerFn = new 
SpannerIO.SpannerWriteGroupFn(write);
+    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
     DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two, three));
 
@@ -198,7 +198,7 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(0) // turn off batching.
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteGroupFn writerFn = new 
SpannerIO.SpannerWriteGroupFn(write);
+    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
     DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(one, two));
 
@@ -224,7 +224,7 @@ public class SpannerIOTest implements Serializable {
             .withDatabaseId("test-database")
             .withBatchSizeBytes(batchSize)
             .withServiceFactory(serviceFactory);
-    SpannerIO.SpannerWriteGroupFn writerFn = new 
SpannerIO.SpannerWriteGroupFn(write);
+    SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write);
     DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn);
     fnTester.processBundle(Arrays.asList(g(one, two, three)));
 

Reply via email to