http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
new file mode 100644
index 0000000..c2b62b7
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+
+/**
+ * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
+ */
+@VisibleForTesting
+class ShardedKeyCoder<KeyT>
+    extends StructuredCoder<ShardedKey<KeyT>> {
+  public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+    return new ShardedKeyCoder<>(keyCoder);
+  }
+
+  private final Coder<KeyT> keyCoder;
+  private final VarIntCoder shardNumberCoder;
+
+  protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+    this.keyCoder = keyCoder;
+    this.shardNumberCoder = VarIntCoder.of();
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(keyCoder);
+  }
+
+  @Override
+  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
+      throws IOException {
+    keyCoder.encode(key.getKey(), outStream);
+    shardNumberCoder.encode(key.getShardNumber(), outStream);
+  }
+
+  @Override
+  public ShardedKey<KeyT> decode(InputStream inStream)
+      throws IOException {
+    return new ShardedKey<>(
+        keyCoder.decode(inStream),
+        shardNumberCoder.decode(inStream));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    keyCoder.verifyDeterministic();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index a210858..63e5bc1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
index fa5b3ce..18b2033 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -30,7 +29,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 51b9375..cd88222 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ShardedKey;
 
 /**
  * Fn that tags each table row with a unique id and destination table. To 
avoid calling

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index e1ed746..d68779a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,7 +19,6 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -41,7 +40,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
index 887cb93..45dc2a8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -22,7 +22,6 @@ import com.google.api.services.bigquery.model.TableRow;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 451d1bd..acd1132 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -26,7 +26,6 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ed2916..c5494d8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -42,7 +42,6 @@ import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.ShardedKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0a90dde..1692cda 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -175,7 +175,7 @@ import org.slf4j.LoggerFactory;
  * pipeline. Please refer to the documentation of corresponding
  * {@link PipelineRunner PipelineRunners} for more details.
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class BigtableIO {
   private static final Logger LOG = LoggerFactory.getLogger(BigtableIO.class);
 
@@ -211,7 +211,7 @@ public class BigtableIO {
    *
    * @see BigtableIO
    */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
+  @Experimental
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<Row>> {
 
@@ -415,7 +415,7 @@ public class BigtableIO {
    *
    * @see BigtableIO
    */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
+  @Experimental
   @AutoValue
   public abstract static class Write
       extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, 
PDone> {
@@ -1027,7 +1027,7 @@ public class BigtableIO {
             "{}: Failed to interpolate key for fraction {}.", 
rangeTracker.getRange(), fraction, e);
         return null;
       }
-      LOG.info(
+      LOG.debug(
           "Proposing to split {} at fraction {} (key {})", rangeTracker, 
fraction, splitKey);
       BigtableSource primary;
       BigtableSource residual;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 07476e2..d1a17fe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -168,13 +168,14 @@ class BigtableServiceImpl implements BigtableService {
     private BigtableSession session;
     private AsyncExecutor executor;
     private BulkMutation bulkMutation;
-    private final String tableName;
+    private final MutateRowRequest.Builder partialBuilder;
 
     public BigtableWriterImpl(BigtableSession session, BigtableTableName 
tableName) {
       this.session = session;
       executor = session.createAsyncExecutor();
       bulkMutation = session.createBulkMutation(tableName, executor);
-      this.tableName = tableName.toString();
+
+      partialBuilder = 
MutateRowRequest.newBuilder().setTableName(tableName.toString());
     }
 
     @Override
@@ -207,8 +208,8 @@ class BigtableServiceImpl implements BigtableService {
         KV<ByteString, Iterable<Mutation>> record)
         throws IOException {
       MutateRowRequest r =
-          MutateRowRequest.newBuilder()
-              .setTableName(tableName)
+          partialBuilder
+              .clone()
               .setRowKey(record.getKey())
               .addAllMutations(record.getValue())
               .build();

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
deleted file mode 100644
index ce6ebe6..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Random;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.MovingFunction;
-
-
-/**
- * An implementation of client-side adaptive throttling. See
- * 
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
- * for a full discussion of the use case and algorithm applied.
- */
-class AdaptiveThrottler {
-  private final MovingFunction successfulRequests;
-  private final MovingFunction allRequests;
-
-  /** The target ratio between requests sent and successful requests. This is 
"K" in the formula in
-   * https://landing.google.com/sre/book/chapters/handling-overload.html */
-  private final double overloadRatio;
-
-  /** The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
-   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
-   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
-   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html */
-  private static final double MIN_REQUESTS = 1;
-  private final Random random;
-
-  /**
-   * @param samplePeriodMs the time window to keep of request history to 
inform throttling
-   * decisions.
-   * @param sampleUpdateMs the length of buckets within this time window.
-   * @param overloadRatio the target ratio between requests sent and 
successful requests. You should
-   * always set this to more than 1, otherwise the client would never try to 
send more requests than
-   * succeeded in the past - so it could never recover from temporary setbacks.
-   */
-  public AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
-      double overloadRatio) {
-    this(samplePeriodMs, sampleUpdateMs, overloadRatio, new Random());
-  }
-
-  @VisibleForTesting
-  AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
-      double overloadRatio, Random random) {
-    allRequests =
-        new MovingFunction(samplePeriodMs, sampleUpdateMs,
-        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, 
Sum.ofLongs());
-    successfulRequests =
-        new MovingFunction(samplePeriodMs, sampleUpdateMs,
-        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, 
Sum.ofLongs());
-    this.overloadRatio = overloadRatio;
-    this.random = random;
-  }
-
-  @VisibleForTesting
-  double throttlingProbability(long nowMsSinceEpoch) {
-    if (!allRequests.isSignificant()) {
-      return 0;
-    }
-    long allRequestsNow = allRequests.get(nowMsSinceEpoch);
-    long successfulRequestsNow = successfulRequests.get(nowMsSinceEpoch);
-    return Math.max(0,
-        (allRequestsNow - overloadRatio * successfulRequestsNow) / 
(allRequestsNow + MIN_REQUESTS));
-  }
-
-  /**
-   * Call this before sending a request to the remote service; if this returns 
true, drop the
-   * request (treating it as a failure or trying it again at a later time).
-   */
-  public boolean throttleRequest(long nowMsSinceEpoch) {
-    double delayProbability = throttlingProbability(nowMsSinceEpoch);
-    // Note that we increment the count of all requests here, even if we 
return true - so even if we
-    // tell the client not to send a request at all, it still counts as a 
failed request.
-    allRequests.add(nowMsSinceEpoch, 1);
-
-    return (random.nextDouble() < delayProbability);
-  }
-
-  /**
-   * Call this after {@link throttleRequest} if your request was successful.
-   */
-  public void successfulRequest(long nowMsSinceEpoch) {
-    successfulRequests.add(nowMsSinceEpoch, 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 5f65428..b198a6f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,8 +71,6 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -203,31 +201,11 @@ public class DatastoreV1 {
   DatastoreV1() {}
 
   /**
-   * The number of entity updates written per RPC, initially. We buffer 
updates in the connector and
-   * write a batch to Datastore once we have collected a certain number. This 
is the initial batch
-   * size; it is adjusted at runtime based on the performance of previous 
writes (see {@link
-   * DatastoreV1.WriteBatcher}).
-   *
-   * <p>Testing has found that a batch of 200 entities will generally finish 
within the timeout even
-   * in adverse conditions.
-   */
-  @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200;
-
-  /**
-   * When choosing the number of updates in a single RPC, never exceed the 
maximum allowed by the
-   * API.
+   * Cloud Datastore has a limit of 500 mutations per batch operation, so we 
flush
+   * changes to Datastore every 500 entities.
    */
   @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500;
-
-  /**
-   * When choosing the number of updates in a single RPC, do not go below this 
value.  The actual
-   * number of entities per request may be lower when we flush for the end of 
a bundle or if we hit
-   * {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}.
-   */
-  @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10;
+  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
 
   /**
    * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the 
total size of mutations
@@ -235,7 +213,7 @@ public class DatastoreV1 {
    * the mutations themselves and not the CommitRequest wrapper around them.
    */
   @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000;
+  static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000;
 
   /**
    * Returns an empty {@link DatastoreV1.Read} builder. Configure the source 
{@code projectId},
@@ -1129,74 +1107,18 @@ public class DatastoreV1 {
     }
   }
 
-  /** Determines batch sizes for commit RPCs. */
-  @VisibleForTesting
-  interface WriteBatcher {
-    /** Call before using this WriteBatcher. */
-    void start();
-
-    /**
-     * Reports the latency of a previous commit RPC, and the number of 
mutations that it contained.
-     */
-    void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int 
numMutations);
-
-    /** Returns the number of entities to include in the next CommitRequest. */
-    int nextBatchSize(long timeSinceEpochMillis);
-  }
-
-  /**
-   * Determines batch sizes for commit RPCs based on past performance.
-   *
-   * <p>It aims for a target response time per RPC: it uses the response times 
for previous RPCs
-   * and the number of entities contained in them, calculates a rolling 
average time-per-entity, and
-   * chooses the number of entities for future writes to hit the target time.
-   *
-   * <p>This enables us to send large batches without sending over-large 
requests in the case of
-   * expensive entity writes that may timeout before the server can apply them 
all.
-   */
-  @VisibleForTesting
-  static class WriteBatcherImpl implements WriteBatcher, Serializable {
-    /** Target time per RPC for writes. */
-    static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000;
-
-    @Override
-    public void start() {
-      meanLatencyPerEntityMs = new MovingAverage(
-          120000 /* sample period 2 minutes */, 10000 /* sample interval 10s 
*/,
-          1 /* numSignificantBuckets */, 1 /* numSignificantSamples */);
-    }
-
-    @Override
-    public void addRequestLatency(long timeSinceEpochMillis, long 
latencyMillis, int numMutations) {
-      meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / 
numMutations);
-    }
-
-    @Override
-    public int nextBatchSize(long timeSinceEpochMillis) {
-      if (!meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) {
-        return DATASTORE_BATCH_UPDATE_ENTITIES_START;
-      }
-      long recentMeanLatency = 
Math.max(meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1);
-      return (int) Math.max(DATASTORE_BATCH_UPDATE_ENTITIES_MIN,
-          Math.min(DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT,
-            DATASTORE_BATCH_TARGET_LATENCY_MS / recentMeanLatency));
-    }
-
-    private transient MovingAverage meanLatencyPerEntityMs;
-  }
-
   /**
    * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations 
are written in
-   * batches; see {@link DatastoreV1.WriteBatcherImpl}.
+   * batches, where the maximum batch size is {@link 
DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}.
    *
    * <p>See <a
    * href="https://cloud.google.com/datastore/docs/concepts/entities";>
    * Datastore: Entities, Properties, and Keys</a> for information about 
entity keys and mutations.
    *
    * <p>Commits are non-transactional.  If a commit fails because of a 
conflict over an entity
-   * group, the commit will be retried (up to {@link 
DatastoreV1.DatastoreWriterFn#MAX_RETRIES}
+   * group, the commit will be retried (up to {@link 
DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}
    * times). This means that the mutation operation should be idempotent. 
Thus, the writer should
-   * only be used for {@code upsert} and {@code delete} mutation operations, 
as these are the only
+   * only be used for {code upsert} and {@code delete} mutation operations, as 
these are the only
    * two Cloud Datastore mutations that are idempotent.
    */
   @VisibleForTesting
@@ -1210,14 +1132,6 @@ public class DatastoreV1 {
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
     private int mutationsSize = 0;  // Accumulated size of protos in mutations.
-    private WriteBatcher writeBatcher;
-    private transient AdaptiveThrottler throttler;
-    private final Counter throttledSeconds =
-      Metrics.counter(DatastoreWriterFn.class, "cumulativeThrottlingSeconds");
-    private final Counter rpcErrors =
-      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
-    private final Counter rpcSuccesses =
-      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1225,31 +1139,24 @@ public class DatastoreV1 {
             
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
     DatastoreWriterFn(String projectId, @Nullable String localhost) {
-      this(StaticValueProvider.of(projectId), localhost, new 
V1DatastoreFactory(),
-          new WriteBatcherImpl());
+      this(StaticValueProvider.of(projectId), localhost, new 
V1DatastoreFactory());
     }
 
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost) {
-      this(projectId, localhost, new V1DatastoreFactory(), new 
WriteBatcherImpl());
+      this(projectId, localhost, new V1DatastoreFactory());
     }
 
     @VisibleForTesting
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost,
-        V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) {
+        V1DatastoreFactory datastoreFactory) {
       this.projectId = checkNotNull(projectId, "projectId");
       this.localhost = localhost;
       this.datastoreFactory = datastoreFactory;
-      this.writeBatcher = writeBatcher;
     }
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
projectId.get(), localhost);
-      writeBatcher.start();
-      if (throttler == null) {
-        // Initialize throttler at first use, because it is not serializable.
-        throttler = new AdaptiveThrottler(120000, 10000, 1.25);
-      }
     }
 
     @ProcessElement
@@ -1262,7 +1169,7 @@ public class DatastoreV1 {
       }
       mutations.add(c.element());
       mutationsSize += size;
-      if (mutations.size() >= 
writeBatcher.nextBatchSize(System.currentTimeMillis())) {
+      if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
         flushBatch();
       }
     }
@@ -1292,42 +1199,18 @@ public class DatastoreV1 {
 
       while (true) {
         // Batch upsert entities.
-        CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-        commitRequest.addAllMutations(mutations);
-        commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-        long startTime = System.currentTimeMillis(), endTime;
-
-        if (throttler.throttleRequest(startTime)) {
-          LOG.info("Delaying request due to previous failures");
-          
throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
-          sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
-          continue;
-        }
-
         try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          commitRequest.addAllMutations(mutations);
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
           datastore.commit(commitRequest.build());
-          endTime = System.currentTimeMillis();
-
-          writeBatcher.addRequestLatency(endTime, endTime - startTime, 
mutations.size());
-          throttler.successfulRequest(startTime);
-          rpcSuccesses.inc();
-
           // Break if the commit threw no exception.
           break;
         } catch (DatastoreException exception) {
-          if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
-            /* Most errors are not related to request size, and should not 
change our expectation of
-             * the latency of successful requests. DEADLINE_EXCEEDED can be 
taken into
-             * consideration, though. */
-            endTime = System.currentTimeMillis();
-            writeBatcher.addRequestLatency(endTime, endTime - startTime, 
mutations.size());
-          }
           // Only log the code and message for potentially-transient errors. 
The entire exception
           // will be propagated upon the last retry.
-          LOG.error("Error writing batch of {} mutations to Datastore ({}): 
{}", mutations.size(),
-              exception.getCode(), exception.getMessage());
-          rpcErrors.inc();
-
+          LOG.error("Error writing to the Datastore ({}): {}", 
exception.getCode(),
+              exception.getMessage());
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
deleted file mode 100644
index 0890e79..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.util.MovingFunction;
-
-
-class MovingAverage {
-  private final MovingFunction sum;
-  private final MovingFunction count;
-
-  public MovingAverage(long samplePeriodMs, long sampleUpdateMs,
-                        int numSignificantBuckets, int numSignificantSamples) {
-    sum = new MovingFunction(samplePeriodMs, sampleUpdateMs,
-        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
-    count = new MovingFunction(samplePeriodMs, sampleUpdateMs,
-        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
-  }
-
-  public void add(long nowMsSinceEpoch, long value) {
-    sum.add(nowMsSinceEpoch, value);
-    count.add(nowMsSinceEpoch, 1);
-  }
-
-  public long get(long nowMsSinceEpoch) {
-    return sum.get(nowMsSinceEpoch) / count.get(nowMsSinceEpoch);
-  }
-
-  public boolean hasValue(long nowMsSinceEpoch) {
-    return sum.isSignificant() && count.isSignificant()
-      && count.get(nowMsSinceEpoch) > 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
deleted file mode 100644
index 00008f1..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.DatabaseClient;
-import com.google.cloud.spanner.DatabaseId;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link
- * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database 
client.
- */
-abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, 
OutputT> {
-  private transient Spanner spanner;
-  private transient DatabaseClient databaseClient;
-
-  abstract SpannerConfig getSpannerConfig();
-
-  @Setup
-  public void setup() throws Exception {
-    SpannerConfig spannerConfig = getSpannerConfig();
-    SpannerOptions options = spannerConfig.buildSpannerOptions();
-    spanner = options.getService();
-    databaseClient = spanner.getDatabaseClient(DatabaseId
-        .of(options.getProjectId(), spannerConfig.getInstanceId().get(),
-            spannerConfig.getDatabaseId().get()));
-  }
-
-  @Teardown
-  public void teardown() throws Exception {
-    if (spanner == null) {
-      return;
-    }
-    spanner.close();
-    spanner = null;
-  }
-
-  protected DatabaseClient databaseClient() {
-    return databaseClient;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
deleted file mode 100644
index da8e8b1..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.ReadOnlyTransaction;
-import com.google.cloud.spanner.ResultSet;
-import com.google.cloud.spanner.Statement;
-
-/** Creates a batch transaction. */
-class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> {
-
-  private final SpannerIO.CreateTransaction config;
-
-  CreateTransactionFn(SpannerIO.CreateTransaction config) {
-    this.config = config;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    try (ReadOnlyTransaction readOnlyTransaction =
-        databaseClient().readOnlyTransaction(config.getTimestampBound())) {
-      // Run a dummy sql statement to force the RPC and obtain the timestamp 
from the server.
-      ResultSet resultSet = 
readOnlyTransaction.executeQuery(Statement.of("SELECT 1"));
-      while (resultSet.next()) {
-        // do nothing
-      }
-      Transaction tx = 
Transaction.create(readOnlyTransaction.getReadTimestamp());
-      c.output(tx);
-    }
-  }
-
-  @Override
-  SpannerConfig getSpannerConfig() {
-    return config.getSpannerConfig();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
deleted file mode 100644
index 5b08da2..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.Mutation;
-import com.google.common.collect.ImmutableList;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A bundle of mutations that must be submitted atomically.
- *
- * <p>One of the mutations is chosen to be "primary", and can be used to 
determine partitions.
- */
-public final class MutationGroup implements Serializable, Iterable<Mutation> {
-  private final ImmutableList<Mutation> mutations;
-
-  /**
-   * Creates a new group.
-   *
-   * @param primary a primary mutation.
-   * @param other other mutations, usually interleaved in parent.
-   * @return new mutation group.
-   */
-  public static MutationGroup create(Mutation primary, Mutation... other) {
-    return create(primary, Arrays.asList(other));
-  }
-
-  public static MutationGroup create(Mutation primary, Iterable<Mutation> 
other) {
-    return new 
MutationGroup(ImmutableList.<Mutation>builder().add(primary).addAll(other).build());
-  }
-
-  @Override
-  public Iterator<Mutation> iterator() {
-    return mutations.iterator();
-  }
-
-  private MutationGroup(ImmutableList<Mutation> mutations) {
-    this.mutations = mutations;
-  }
-
-  public Mutation primary() {
-    return mutations.get(0);
-  }
-
-  public List<Mutation> attached() {
-    return mutations.subList(1, mutations.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
index 2418816..61652e7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java
@@ -44,15 +44,6 @@ class MutationSizeEstimator {
     return result;
   }
 
-  /** Estimates a size of the mutation group in bytes. */
-  public static long sizeOf(MutationGroup group) {
-    long result = 0;
-    for (Mutation m : group) {
-      result += sizeOf(m);
-    }
-    return result;
-  }
-
   private static long estimatePrimitiveValue(Value v) {
     switch (v.getType().getCode()) {
       case BOOL:

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
deleted file mode 100644
index d193b95..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.ReadOnlyTransaction;
-import com.google.cloud.spanner.ResultSet;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
-import com.google.common.annotations.VisibleForTesting;
-
-/** A simplest read function implementation. Parallelism support is coming. */
-@VisibleForTesting
-class NaiveSpannerReadFn extends AbstractSpannerFn<Object, Struct> {
-  private final SpannerIO.Read config;
-
-  NaiveSpannerReadFn(SpannerIO.Read config) {
-    this.config = config;
-  }
-
-  SpannerConfig getSpannerConfig() {
-    return config.getSpannerConfig();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    TimestampBound timestampBound = TimestampBound.strong();
-    if (config.getTransaction() != null) {
-      Transaction transaction = c.sideInput(config.getTransaction());
-      timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp());
-    }
-    try (ReadOnlyTransaction readOnlyTransaction =
-        databaseClient().readOnlyTransaction(timestampBound)) {
-      ResultSet resultSet = execute(readOnlyTransaction);
-      while (resultSet.next()) {
-        c.output(resultSet.getCurrentRowAsStruct());
-      }
-    }
-  }
-
-  private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) {
-    if (config.getQuery() != null) {
-      return readOnlyTransaction.executeQuery(config.getQuery());
-    }
-    if (config.getIndex() != null) {
-      return readOnlyTransaction.readUsingIndex(
-          config.getTable(), config.getIndex(), config.getKeySet(), 
config.getColumns());
-    }
-    return readOnlyTransaction.read(config.getTable(), config.getKeySet(), 
config.getColumns());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
deleted file mode 100644
index 02716fb..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import com.google.cloud.ServiceFactory;
-import com.google.cloud.spanner.Spanner;
-import com.google.cloud.spanner.SpannerOptions;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.Serializable;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-
-/** Configuration for a Cloud Spanner client. */
-@AutoValue
-public abstract class SpannerConfig implements Serializable {
-
-  @Nullable
-  abstract ValueProvider<String> getProjectId();
-
-  @Nullable
-  abstract ValueProvider<String> getInstanceId();
-
-  @Nullable
-  abstract ValueProvider<String> getDatabaseId();
-
-  @Nullable
-  @VisibleForTesting
-  abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
-
-  abstract Builder toBuilder();
-
-  SpannerOptions buildSpannerOptions() {
-    SpannerOptions.Builder builder = SpannerOptions.newBuilder();
-    if (getProjectId() != null) {
-      builder.setProjectId(getProjectId().get());
-    }
-    if (getServiceFactory() != null) {
-      builder.setServiceFactory(getServiceFactory());
-    }
-    return builder.build();
-  }
-
-  public static SpannerConfig create() {
-    return builder().build();
-  }
-
-  static Builder builder() {
-    return new AutoValue_SpannerConfig.Builder();
-  }
-
-  public void validate(PipelineOptions options) {
-    checkNotNull(
-        getInstanceId(),
-        "SpannerIO.read() requires instance id to be set with withInstanceId 
method");
-    checkNotNull(
-        getDatabaseId(),
-        "SpannerIO.read() requires database id to be set with withDatabaseId 
method");
-  }
-
-  public void populateDisplayData(DisplayData.Builder builder) {
-    builder
-        .addIfNotNull(DisplayData.item("projectId", 
getProjectId()).withLabel("Output Project"))
-        .addIfNotNull(DisplayData.item("instanceId", 
getInstanceId()).withLabel("Output Instance"))
-        .addIfNotNull(DisplayData.item("databaseId", 
getDatabaseId()).withLabel("Output Database"));
-
-    if (getServiceFactory() != null) {
-      builder.addIfNotNull(
-          DisplayData.item("serviceFactory", 
getServiceFactory().getClass().getName())
-              .withLabel("Service Factory"));
-    }
-  }
-
-  /** Builder for {@link SpannerConfig}. */
-  @AutoValue.Builder
-  public abstract static class Builder {
-
-    abstract Builder setProjectId(ValueProvider<String> projectId);
-
-    abstract Builder setInstanceId(ValueProvider<String> instanceId);
-
-    abstract Builder setDatabaseId(ValueProvider<String> databaseId);
-
-    abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory);
-
-    public abstract SpannerConfig build();
-  }
-
-  public SpannerConfig withProjectId(ValueProvider<String> projectId) {
-    return toBuilder().setProjectId(projectId).build();
-  }
-
-  public SpannerConfig withProjectId(String projectId) {
-    return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
-  }
-
-  public SpannerConfig withInstanceId(ValueProvider<String> instanceId) {
-    return toBuilder().setInstanceId(instanceId).build();
-  }
-
-  public SpannerConfig withInstanceId(String instanceId) {
-    return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
-  }
-
-  public SpannerConfig withDatabaseId(ValueProvider<String> databaseId) {
-    return toBuilder().setDatabaseId(databaseId).build();
-  }
-
-  public SpannerConfig withDatabaseId(String databaseId) {
-    return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
-  }
-
-  @VisibleForTesting
-  SpannerConfig withServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory) {
-    return toBuilder().setServiceFactory(serviceFactory).build();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index a247d4c..5058d13 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -17,39 +17,38 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.ServiceFactory;
-import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.KeySet;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.DatabaseClient;
+import com.google.cloud.spanner.DatabaseId;
 import com.google.cloud.spanner.Mutation;
 import com.google.cloud.spanner.Spanner;
 import com.google.cloud.spanner.SpannerOptions;
-import com.google.cloud.spanner.Statement;
-import com.google.cloud.spanner.Struct;
-import com.google.cloud.spanner.TimestampBound;
 import com.google.common.annotations.VisibleForTesting;
-
-import java.util.Arrays;
-import java.util.Collections;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Experimental {@link PTransform Transforms} for reading from and writing to 
<a
@@ -57,69 +56,7 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <h3>Reading from Cloud Spanner</h3>
  *
- * <p>To read from Cloud Spanner, apply {@link SpannerIO.Read} transformation. 
It will return a
- * {@link PCollection} of {@link Struct Structs}, where each element represents
- * an individual row returned from the read operation. Both Query and Read 
APIs are supported.
- * See more information about <a 
href="https://cloud.google.com/spanner/docs/reads";>reading from
- * Cloud Spanner</a>
- *
- * <p>To execute a <strong>query</strong>, specify a {@link 
SpannerIO.Read#withQuery(Statement)} or
- * {@link SpannerIO.Read#withQuery(String)} during the construction of the 
transform.
- *
- * <pre>{@code
- *  PCollection<Struct> rows = p.apply(
- *      SpannerIO.read()
- *          .withInstanceId(instanceId)
- *          .withDatabaseId(dbId)
- *          .withQuery("SELECT id, name, email FROM users"));
- * }</pre>
- *
- * <p>To use the Read API, specify a {@link SpannerIO.Read#withTable(String) 
table name} and
- * a {@link SpannerIO.Read#withColumns(List) list of columns}.
- *
- * <pre>{@code
- * PCollection<Struct> rows = p.apply(
- *    SpannerIO.read()
- *        .withInstanceId(instanceId)
- *        .withDatabaseId(dbId)
- *        .withTable("users")
- *        .withColumns("id", "name", "email"));
- * }</pre>
- *
- * <p>To optimally read using index, specify the index name using {@link 
SpannerIO.Read#withIndex}.
- *
- * <p>The transform is guaranteed to be executed on a consistent snapshot of 
data, utilizing the
- * power of read only transactions. Staleness of data can be controlled using
- * {@link SpannerIO.Read#withTimestampBound} or {@link 
SpannerIO.Read#withTimestamp(Timestamp)}
- * methods. <a href="https://cloud.google.com/spanner/docs/transactions";>Read 
more</a> about
- * transactions in Cloud Spanner.
- *
- * <p>It is possible to read several {@link PCollection PCollections} within a 
single transaction.
- * Apply {@link SpannerIO#createTransaction()} transform, that lazily creates 
a transaction. The
- * result of this transformation can be passed to read operation using
- * {@link SpannerIO.Read#withTransaction(PCollectionView)}.
- *
- * <pre>{@code
- * SpannerConfig spannerConfig = ...
- *
- * PCollectionView<Transaction> tx =
- * p.apply(
- *    SpannerIO.createTransaction()
- *        .withSpannerConfig(spannerConfig)
- *        .withTimestampBound(TimestampBound.strong()));
- *
- * PCollection<Struct> users = p.apply(
- *    SpannerIO.read()
- *        .withSpannerConfig(spannerConfig)
- *        .withQuery("SELECT name, email FROM users")
- *        .withTransaction(tx));
- *
- * PCollection<Struct> tweets = p.apply(
- *    SpannerIO.read()
- *        .withSpannerConfig(spannerConfig)
- *        .withQuery("SELECT user, tweet, date FROM tweets")
- *        .withTransaction(tx));
- * }</pre>
+ * <p>This functionality is not yet implemented.
  *
  * <h3>Writing to Cloud Spanner</h3>
  *
@@ -151,11 +88,6 @@ import org.apache.beam.sdk.values.PDone;
  *   <li>If the pipeline was unexpectedly stopped, mutations that were already 
applied will not get
  *       rolled back.
  * </ul>
- *
- * <p>Use {@link MutationGroup} to ensure that a small set mutations is 
bundled together. It is
- * guaranteed that mutations in a group are submitted in the same transaction. 
Build
- * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. 
It will return a
- * transformation that can be applied to a PCollection of MutationGroup.
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class SpannerIO {
@@ -163,33 +95,6 @@ public class SpannerIO {
   private static final long DEFAULT_BATCH_SIZE_BYTES = 1024 * 1024; // 1 MB
 
   /**
-   * Creates an uninitialized instance of {@link Read}. Before use, the {@link 
Read} must be
-   * configured with a {@link Read#withInstanceId} and {@link 
Read#withDatabaseId} that identify the
-   * Cloud Spanner database.
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  public static Read read() {
-    return new AutoValue_SpannerIO_Read.Builder()
-        .setSpannerConfig(SpannerConfig.create())
-        .setTimestampBound(TimestampBound.strong())
-        .setKeySet(KeySet.all())
-        .build();
-  }
-
-  /**
-   * Returns a transform that creates a batch transaction. By default,
-   * {@link TimestampBound#strong()} transaction is created, to override this 
use
-   * {@link CreateTransaction#withTimestampBound(TimestampBound)}.
-   */
-  @Experimental
-  public static CreateTransaction createTransaction() {
-    return new AutoValue_SpannerIO_CreateTransaction.Builder()
-        .setSpannerConfig(SpannerConfig.create())
-        .setTimestampBound(TimestampBound.strong())
-        .build();
-  }
-
-  /**
    * Creates an uninitialized instance of {@link Write}. Before use, the 
{@link Write} must be
    * configured with a {@link Write#withInstanceId} and {@link 
Write#withDatabaseId} that identify
    * the Cloud Spanner database being written.
@@ -197,408 +102,247 @@ public class SpannerIO {
   @Experimental
   public static Write write() {
     return new AutoValue_SpannerIO_Write.Builder()
-        .setSpannerConfig(SpannerConfig.create())
         .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES)
         .build();
   }
 
   /**
-   * A {@link PTransform} that reads data from Google Cloud Spanner.
+   * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud 
Spanner.
    *
    * @see SpannerIO
    */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   @AutoValue
-  public abstract static class Read extends PTransform<PBegin, 
PCollection<Struct>> {
-
-    abstract SpannerConfig getSpannerConfig();
-
-    @Nullable
-    abstract TimestampBound getTimestampBound();
-
-    @Nullable
-    abstract Statement getQuery();
+  public abstract static class Write extends PTransform<PCollection<Mutation>, 
PDone> {
 
     @Nullable
-    abstract String getTable();
+    abstract String getProjectId();
 
     @Nullable
-    abstract String getIndex();
+    abstract String getInstanceId();
 
     @Nullable
-    abstract List<String> getColumns();
+    abstract String getDatabaseId();
 
-    @Nullable
-    abstract KeySet getKeySet();
+    abstract long getBatchSizeBytes();
 
     @Nullable
-    abstract PCollectionView<Transaction> getTransaction();
+    @VisibleForTesting
+    abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
 
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
 
-      abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
-
-      abstract Builder setTimestampBound(TimestampBound timestampBound);
-
-      abstract Builder setQuery(Statement statement);
-
-      abstract Builder setTable(String table);
-
-      abstract Builder setIndex(String index);
+      abstract Builder setProjectId(String projectId);
 
-      abstract Builder setColumns(List<String> columns);
+      abstract Builder setInstanceId(String instanceId);
 
-      abstract Builder setKeySet(KeySet keySet);
+      abstract Builder setDatabaseId(String databaseId);
 
-      abstract Builder setTransaction(PCollectionView<Transaction> 
transaction);
-
-      abstract Read build();
-    }
-
-    /** Specifies the Cloud Spanner configuration. */
-    public Read withSpannerConfig(SpannerConfig spannerConfig) {
-      return toBuilder().setSpannerConfig(spannerConfig).build();
-    }
+      abstract Builder setBatchSizeBytes(long batchSizeBytes);
 
-    /** Specifies the Cloud Spanner project. */
-    public Read withProjectId(String projectId) {
-      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
-    }
+      @VisibleForTesting
+      abstract Builder setServiceFactory(ServiceFactory<Spanner, 
SpannerOptions> serviceFactory);
 
-    /** Specifies the Cloud Spanner project. */
-    public Read withProjectId(ValueProvider<String> projectId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withProjectId(projectId));
+      abstract Write build();
     }
 
-    /** Specifies the Cloud Spanner instance. */
-    public Read withInstanceId(String instanceId) {
-      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner project.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withProjectId(String projectId) {
+      return toBuilder().setProjectId(projectId).build();
     }
 
-    /** Specifies the Cloud Spanner instance. */
-    public Read withInstanceId(ValueProvider<String> instanceId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withInstanceId(instanceId));
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner
+     * instance.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withInstanceId(String instanceId) {
+      return toBuilder().setInstanceId(instanceId).build();
     }
 
-    /** Specifies the Cloud Spanner database. */
-    public Read withDatabaseId(String databaseId) {
-      return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+    /**
+     * Returns a new {@link SpannerIO.Write} with a new batch size limit.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBatchSizeBytes(long batchSizeBytes) {
+      return toBuilder().setBatchSizeBytes(batchSizeBytes).build();
     }
 
-    /** Specifies the Cloud Spanner database. */
-    public Read withDatabaseId(ValueProvider<String> databaseId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withDatabaseId(databaseId));
+    /**
+     * Returns a new {@link SpannerIO.Write} that will write to the specified 
Cloud Spanner
+     * database.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withDatabaseId(String databaseId) {
+      return toBuilder().setDatabaseId(databaseId).build();
     }
 
     @VisibleForTesting
-    Read withServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withServiceFactory(serviceFactory));
-    }
-
-    public Read withTransaction(PCollectionView<Transaction> transaction) {
-      return toBuilder().setTransaction(transaction).build();
-    }
-
-    public Read withTimestamp(Timestamp timestamp) {
-      return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp));
-    }
-
-    public Read withTimestampBound(TimestampBound timestampBound) {
-      return toBuilder().setTimestampBound(timestampBound).build();
-    }
-
-    public Read withTable(String table) {
-      return toBuilder().setTable(table).build();
-    }
-
-    public Read withColumns(String... columns) {
-      return withColumns(Arrays.asList(columns));
-    }
-
-    public Read withColumns(List<String> columns) {
-      return toBuilder().setColumns(columns).build();
-    }
-
-    public Read withQuery(Statement statement) {
-      return toBuilder().setQuery(statement).build();
-    }
-
-    public Read withQuery(String sql) {
-      return withQuery(Statement.of(sql));
-    }
-
-    public Read withKeySet(KeySet keySet) {
-      return toBuilder().setKeySet(keySet).build();
-    }
-
-    public Read withIndex(String index) {
-      return toBuilder().setIndex(index).build();
+    Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory) {
+      return toBuilder().setServiceFactory(serviceFactory).build();
     }
 
-
     @Override
     public void validate(PipelineOptions options) {
-      getSpannerConfig().validate(options);
       checkNotNull(
-          getTimestampBound(),
-          "SpannerIO.read() runs in a read only transaction and requires 
timestamp to be set "
-              + "with withTimestampBound or withTimestamp method");
-
-      if (getQuery() != null) {
-        // TODO: validate query?
-      } else if (getTable() != null) {
-        // Assume read
-        checkNotNull(
-            getColumns(),
-            "For a read operation SpannerIO.read() requires a list of "
-                + "columns to set with withColumns method");
-        checkArgument(
-            !getColumns().isEmpty(),
-            "For a read operation SpannerIO.read() requires a"
-                + " list of columns to set with withColumns method");
-      } else {
-        throw new IllegalArgumentException(
-            "SpannerIO.read() requires configuring query or read operation.");
-      }
+          getInstanceId(),
+          "SpannerIO.write() requires instance id to be set with 
withInstanceId method");
+      checkNotNull(
+          getDatabaseId(),
+          "SpannerIO.write() requires database id to be set with 
withDatabaseId method");
     }
 
     @Override
-    public PCollection<Struct> expand(PBegin input) {
-      Read config = this;
-      List<PCollectionView<Transaction>> sideInputs = Collections.emptyList();
-      if (getTimestampBound() != null) {
-        PCollectionView<Transaction> transaction =
-            
input.apply(createTransaction().withSpannerConfig(getSpannerConfig()));
-        config = config.withTransaction(transaction);
-        sideInputs = Collections.singletonList(transaction);
-      }
-      return input
-          .apply(Create.of(1))
-          .apply(
-              "Execute query", ParDo.of(new 
NaiveSpannerReadFn(config)).withSideInputs(sideInputs));
-    }
-  }
-
-  /**
-   * A {@link PTransform} that create a transaction.
-   *
-   * @see SpannerIO
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  @AutoValue
-  public abstract static class CreateTransaction
-      extends PTransform<PBegin, PCollectionView<Transaction>> {
-
-    abstract SpannerConfig getSpannerConfig();
-
-    @Nullable
-    abstract TimestampBound getTimestampBound();
-
-    abstract Builder toBuilder();
-
-    @Override
-    public PCollectionView<Transaction> expand(PBegin input) {
-      return input.apply(Create.of(1))
-          .apply("Create transaction", ParDo.of(new CreateTransactionFn(this)))
-          .apply("As PCollectionView", View.<Transaction>asSingleton());
-    }
-
-    /** Specifies the Cloud Spanner configuration. */
-    public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) {
-      return toBuilder().setSpannerConfig(spannerConfig).build();
-    }
-
-    /** Specifies the Cloud Spanner project. */
-    public CreateTransaction withProjectId(String projectId) {
-      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
-    }
-
-    /** Specifies the Cloud Spanner project. */
-    public CreateTransaction withProjectId(ValueProvider<String> projectId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withProjectId(projectId));
-    }
-
-    /** Specifies the Cloud Spanner instance. */
-    public CreateTransaction withInstanceId(String instanceId) {
-      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
-    }
-
-    /** Specifies the Cloud Spanner instance. */
-    public CreateTransaction withInstanceId(ValueProvider<String> instanceId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withInstanceId(instanceId));
-    }
-
-    /** Specifies the Cloud Spanner database. */
-    public CreateTransaction withDatabaseId(String databaseId) {
-      return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
-    }
-
-    /** Specifies the Cloud Spanner database. */
-    public CreateTransaction withDatabaseId(ValueProvider<String> databaseId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withDatabaseId(databaseId));
-    }
-
-    @VisibleForTesting
-    CreateTransaction withServiceFactory(
-        ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withServiceFactory(serviceFactory));
-    }
-
-    public CreateTransaction withTimestampBound(TimestampBound timestampBound) 
{
-      return toBuilder().setTimestampBound(timestampBound).build();
+    public PDone expand(PCollection<Mutation> input) {
+      input.apply("Write mutations to Cloud Spanner", ParDo.of(new 
SpannerWriteFn(this)));
+      return PDone.in(input.getPipeline());
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      getSpannerConfig().validate(options);
-    }
-
-    /** A builder for {@link CreateTransaction}. */
-    @AutoValue.Builder public abstract static class Builder {
-
-      public abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
-
-      public abstract Builder setTimestampBound(TimestampBound 
newTimestampBound);
-
-      public abstract CreateTransaction build();
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", 
getProjectId()).withLabel("Output Project"))
+          .addIfNotNull(
+              DisplayData.item("instanceId", 
getInstanceId()).withLabel("Output Instance"))
+          .addIfNotNull(
+              DisplayData.item("databaseId", 
getDatabaseId()).withLabel("Output Database"))
+          .add(DisplayData.item("batchSizeBytes", getBatchSizeBytes())
+              .withLabel("Batch Size in Bytes"));
+      if (getServiceFactory() != null) {
+        builder.addIfNotNull(
+            DisplayData.item("serviceFactory", 
getServiceFactory().getClass().getName())
+                .withLabel("Service Factory"));
+      }
     }
   }
 
+  /** Batches together and writes mutations to Google Cloud Spanner. */
+  @VisibleForTesting
+  static class SpannerWriteFn extends DoFn<Mutation, Void> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SpannerWriteFn.class);
+    private final Write spec;
+    private transient Spanner spanner;
+    private transient DatabaseClient dbClient;
+    // Current batch of mutations to be written.
+    private List<Mutation> mutations;
+    private long batchSizeBytes = 0;
+
+    private static final int MAX_RETRIES = 5;
+    private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
+        FluentBackoff.DEFAULT
+            .withMaxRetries(MAX_RETRIES)
+            .withInitialBackoff(Duration.standardSeconds(5));
 
-  /**
-   * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud 
Spanner.
-   *
-   * @see SpannerIO
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  @AutoValue
-  public abstract static class Write extends PTransform<PCollection<Mutation>, 
PDone> {
-
-    abstract SpannerConfig getSpannerConfig();
-
-    abstract long getBatchSizeBytes();
-
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-
-      abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
-
-      abstract Builder setBatchSizeBytes(long batchSizeBytes);
-
-      abstract Write build();
-    }
-
-    /** Specifies the Cloud Spanner configuration. */
-    public Write withSpannerConfig(SpannerConfig spannerConfig) {
-      return toBuilder().setSpannerConfig(spannerConfig).build();
-    }
-
-    /** Specifies the Cloud Spanner project. */
-    public Write withProjectId(String projectId) {
-      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+    @VisibleForTesting
+    SpannerWriteFn(Write spec) {
+      this.spec = spec;
     }
 
-    /** Specifies the Cloud Spanner project. */
-    public Write withProjectId(ValueProvider<String> projectId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withProjectId(projectId));
+    @Setup
+    public void setup() throws Exception {
+      SpannerOptions spannerOptions = getSpannerOptions();
+      spanner = spannerOptions.getService();
+      dbClient = spanner.getDatabaseClient(
+          DatabaseId.of(projectId(), spec.getInstanceId(), 
spec.getDatabaseId()));
+      mutations = new ArrayList<>();
+      batchSizeBytes = 0;
     }
 
-    /** Specifies the Cloud Spanner instance. */
-    public Write withInstanceId(String instanceId) {
-      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Mutation m = c.element();
+      mutations.add(m);
+      batchSizeBytes += MutationSizeEstimator.sizeOf(m);
+      if (batchSizeBytes >= spec.getBatchSizeBytes()) {
+        flushBatch();
+      }
     }
 
-    /** Specifies the Cloud Spanner instance. */
-    public Write withInstanceId(ValueProvider<String> instanceId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withInstanceId(instanceId));
+    private String projectId() {
+      return spec.getProjectId() == null
+          ? ServiceOptions.getDefaultProjectId()
+          : spec.getProjectId();
     }
 
-    /** Specifies the Cloud Spanner database. */
-    public Write withDatabaseId(String databaseId) {
-      return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+    @FinishBundle
+    public void finishBundle() throws Exception {
+      if (!mutations.isEmpty()) {
+        flushBatch();
+      }
     }
 
-    /** Specifies the Cloud Spanner database. */
-    public Write withDatabaseId(ValueProvider<String> databaseId) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withDatabaseId(databaseId));
+    @Teardown
+    public void teardown() throws Exception {
+      if (spanner == null) {
+        return;
+      }
+      spanner.closeAsync().get();
+      spanner = null;
     }
 
-    @VisibleForTesting
-    Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> 
serviceFactory) {
-      SpannerConfig config = getSpannerConfig();
-      return withSpannerConfig(config.withServiceFactory(serviceFactory));
+    private SpannerOptions getSpannerOptions() {
+      SpannerOptions.Builder spannerOptionsBuider = 
SpannerOptions.newBuilder();
+      if (spec.getServiceFactory() != null) {
+        spannerOptionsBuider.setServiceFactory(spec.getServiceFactory());
+      }
+      if (spec.getProjectId() != null) {
+        spannerOptionsBuider.setProjectId(spec.getProjectId());
+      }
+      return spannerOptionsBuider.build();
     }
 
     /**
-     * Same transform but can be applied to {@link PCollection} of {@link 
MutationGroup}.
+     * Writes a batch of mutations to Cloud Spanner.
+     *
+     * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} 
times. If the retry limit
+     * is exceeded, the last exception from Cloud Spanner will be thrown.
+     *
+     * @throws AbortedException if the commit fails or IOException or 
InterruptedException if
+     *     backing off between retries fails.
      */
-    public WriteGrouped grouped() {
-      return new WriteGrouped(this);
-    }
-
-    /** Specifies the batch size limit. */
-    public Write withBatchSizeBytes(long batchSizeBytes) {
-      return toBuilder().setBatchSizeBytes(batchSizeBytes).build();
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      getSpannerConfig().validate(options);
-    }
-
-    @Override
-    public PDone expand(PCollection<Mutation> input) {
-      input
-          .apply("To mutation group", ParDo.of(new ToMutationGroupFn()))
-          .apply("Write mutations to Cloud Spanner", ParDo.of(new 
SpannerWriteGroupFn(this)));
-      return PDone.in(input.getPipeline());
+    private void flushBatch() throws AbortedException, IOException, 
InterruptedException {
+      LOG.debug("Writing batch of {} mutations", mutations.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
+
+      while (true) {
+        // Batch upsert rows.
+        try {
+          dbClient.writeAtLeastOnce(mutations);
+
+          // Break if the commit threw no exception.
+          break;
+        } catch (AbortedException exception) {
+          // Only log the code and message for potentially-transient errors. 
The entire exception
+          // will be propagated upon the last retry.
+          LOG.error(
+              "Error writing to Spanner ({}): {}", exception.getCode(), 
exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      LOG.debug("Successfully wrote {} mutations", mutations.size());
+      mutations = new ArrayList<>();
+      batchSizeBytes = 0;
     }
 
-
     @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
+    public void populateDisplayData(Builder builder) {
       super.populateDisplayData(builder);
-      getSpannerConfig().populateDisplayData(builder);
-      builder.add(
-          DisplayData.item("batchSizeBytes", 
getBatchSizeBytes()).withLabel("Batch Size in Bytes"));
-    }
-  }
-
-  /** Same as {@link Write} but supports grouped mutations. */
-  public static class WriteGrouped extends 
PTransform<PCollection<MutationGroup>, PDone> {
-    private final Write spec;
-
-    public WriteGrouped(Write spec) {
-      this.spec = spec;
-    }
-
-    @Override public PDone expand(PCollection<MutationGroup> input) {
-      input.apply("Write mutations to Cloud Spanner", ParDo.of(new 
SpannerWriteGroupFn(spec)));
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> 
{
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      Mutation value = c.element();
-      c.output(MutationGroup.create(value));
+      spec.populateDisplayData(builder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
deleted file mode 100644
index 34a11da..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.cloud.spanner.AbortedException;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.BackOff;
-import org.apache.beam.sdk.util.BackOffUtils;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.Sleeper;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Batches together and writes mutations to Google Cloud Spanner. */
-@VisibleForTesting class SpannerWriteGroupFn extends 
AbstractSpannerFn<MutationGroup, Void> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SpannerWriteGroupFn.class);
-  private final SpannerIO.Write spec;
-  // Current batch of mutations to be written.
-  private List<MutationGroup> mutations;
-  private long batchSizeBytes = 0;
-
-  private static final int MAX_RETRIES = 5;
-  private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
-      FluentBackoff.DEFAULT
-          .withMaxRetries(MAX_RETRIES)
-          .withInitialBackoff(Duration.standardSeconds(5));
-
-  @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) {
-    this.spec = spec;
-  }
-
-  @Override SpannerConfig getSpannerConfig() {
-    return spec.getSpannerConfig();
-  }
-
-  @Setup
-  public void setup() throws Exception {
-    super.setup();
-    mutations = new ArrayList<>();
-    batchSizeBytes = 0;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    MutationGroup m = c.element();
-    mutations.add(m);
-    batchSizeBytes += MutationSizeEstimator.sizeOf(m);
-    if (batchSizeBytes >= spec.getBatchSizeBytes()) {
-      flushBatch();
-    }
-  }
-
-  @FinishBundle
-  public void finishBundle() throws Exception {
-    if (!mutations.isEmpty()) {
-      flushBatch();
-    }
-  }
-
-  /**
-   * Writes a batch of mutations to Cloud Spanner.
-   *
-   * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} 
times. If the retry limit
-   * is exceeded, the last exception from Cloud Spanner will be thrown.
-   *
-   * @throws AbortedException if the commit fails or IOException or 
InterruptedException if
-   *     backing off between retries fails.
-   */
-  private void flushBatch() throws AbortedException, IOException, 
InterruptedException {
-    LOG.debug("Writing batch of {} mutations", mutations.size());
-    Sleeper sleeper = Sleeper.DEFAULT;
-    BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
-
-    while (true) {
-      // Batch upsert rows.
-      try {
-        databaseClient().writeAtLeastOnce(Iterables.concat(mutations));
-
-        // Break if the commit threw no exception.
-        break;
-      } catch (AbortedException exception) {
-        // Only log the code and message for potentially-transient errors. The 
entire exception
-        // will be propagated upon the last retry.
-        LOG.error(
-            "Error writing to Spanner ({}): {}", exception.getCode(), 
exception.getMessage());
-        if (!BackOffUtils.next(sleeper, backoff)) {
-          LOG.error("Aborting after {} retries.", MAX_RETRIES);
-          throw exception;
-        }
-      }
-    }
-    LOG.debug("Successfully wrote {} mutations", mutations.size());
-    mutations = new ArrayList<>();
-    batchSizeBytes = 0;
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    spec.populateDisplayData(builder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java
deleted file mode 100644
index 22af3b8..0000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner;
-
-import com.google.auto.value.AutoValue;
-import com.google.cloud.Timestamp;
-import java.io.Serializable;
-
-/** A transaction object. */
-@AutoValue
-public abstract class Transaction implements Serializable {
-
-  abstract Timestamp timestamp();
-
-  public static Transaction create(Timestamp timestamp) {
-    return new AutoValue_Transaction(timestamp);
-  }
-}

Reply via email to