Repository: incubator-beam
Updated Branches:
  refs/heads/master 8b1e64a66 -> bef1a69f1


Datastore Read as a composite PTransform


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/92030133
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/92030133
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/92030133

Branch: refs/heads/master
Commit: 92030133a721d0bcbd17f5751333b0449257332e
Parents: 8b1e64a
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Tue Jul 26 09:54:43 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Aug 1 22:44:25 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 816 +++++++++----------
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  | 466 +++++------
 2 files changed, 615 insertions(+), 667 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92030133/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index 0ba4433..bda907a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Verify.verify;
 import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
 import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
 import static 
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static 
com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
@@ -33,16 +32,22 @@ import static 
com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
-import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -54,7 +59,6 @@ import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Ints;
 import com.google.datastore.v1beta3.CommitRequest;
 import com.google.datastore.v1beta3.Entity;
 import com.google.datastore.v1beta3.EntityResult;
@@ -79,7 +83,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -99,7 +102,8 @@ import javax.annotation.Nullable;
  * <p>To read a {@link PCollection} from a query to Datastore, use {@link 
V1Beta3#read} and
  * its methods {@link V1Beta3.Read#withProjectId} and {@link 
V1Beta3.Read#withQuery} to
  * specify the project to query and the query to read from. You can optionally 
provide a namespace
- * to query within using {@link V1Beta3.Read#withNamespace}.
+ * to query within using {@link V1Beta3.Read#withNamespace}. You could also 
optionally specify
+ * how many splits you want for the query using {@link 
V1Beta3.Read#withNumQuerySplits}.
  *
  * <p>For example:
  *
@@ -168,11 +172,12 @@ public class V1Beta3 {
 
   /**
    * Returns an empty {@link V1Beta3.Read} builder. Configure the source 
{@code projectId},
-   * {@code query}, and optionally {@code namespace} using {@link 
V1Beta3.Read#withProjectId},
-   * {@link V1Beta3.Read#withQuery}, and {@link V1Beta3.Read#withNamespace}.
+   * {@code query}, and optionally {@code namespace} and {@code 
numQuerySplits} using
+   * {@link V1Beta3.Read#withProjectId}, {@link V1Beta3.Read#withQuery},
+   * {@link V1Beta3.Read#withNamespace}, {@link 
V1Beta3.Read#withNumQuerySplits}.
    */
   public V1Beta3.Read read() {
-    return new V1Beta3.Read(null, null, null);
+    return new V1Beta3.Read(null, null, null, 0);
   }
 
   /**
@@ -182,6 +187,24 @@ public class V1Beta3 {
    * @see DatastoreIO
    */
   public static class Read extends PTransform<PBegin, PCollection<Entity>> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    /** An upper bound on the number of splits for a query. */
+    public static final int NUM_QUERY_SPLITS_MAX = 50000;
+
+    /** A lower bound on the number of splits for a query. */
+    static final int NUM_QUERY_SPLITS_MIN = 12;
+
+    /** Default bundle size of 64MB. */
+    static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024;
+
+    /**
+     * Maximum number of results to request per query.
+     *
+     * <p>Must be set, or it may result in an I/O error when querying Cloud 
Datastore.
+     */
+    static final int QUERY_BATCH_LIMIT = 500;
+
     @Nullable
     private final String projectId;
 
@@ -191,15 +214,100 @@ public class V1Beta3 {
     @Nullable
     private final String namespace;
 
+    private final int numQuerySplits;
+
+    /**
+     * Computes the number of splits to be performed on the given query by 
querying the estimated
+     * size from Datastore.
+     */
+    static int getEstimatedNumSplits(Datastore datastore, Query query, 
@Nullable String namespace) {
+      int numSplits;
+      try {
+        long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, 
namespace);
+        numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
+            Math.round(((double) estimatedSizeBytes) / 
DEFAULT_BUNDLE_SIZE_BYTES));
+      } catch (Exception e) {
+        LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, 
e);
+        // Fallback in case estimated size is unavailable.
+        numSplits = NUM_QUERY_SPLITS_MIN;
+      }
+      return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
+    }
+
+    /**
+     * Get the estimated size of the data returned by the given query.
+     *
+     * <p>Datastore provides no way to get a good estimate of how large the 
result of a query
+     * entity kind being queried, using the __Stat_Kind__ system table, 
assuming exactly 1 kind
+     * is specified in the query.
+     *
+     * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
+     */
+    static long getEstimatedSizeBytes(Datastore datastore, Query query, 
@Nullable String namespace)
+        throws DatastoreException {
+      String ourKind = query.getKind(0).getName();
+      Query.Builder queryBuilder = Query.newBuilder();
+      if (namespace == null) {
+        queryBuilder.addKindBuilder().setName("__Stat_Kind__");
+      } else {
+        queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
+      }
+      queryBuilder.setFilter(makeFilter("kind_name", EQUAL, 
makeValue(ourKind).build()));
+
+      // Get the latest statistics
+      queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
+      queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
+
+      RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+
+      long now = System.currentTimeMillis();
+      RunQueryResponse response = datastore.runQuery(request);
+      LOG.debug("Query for per-kind statistics took {}ms", 
System.currentTimeMillis() - now);
+
+      QueryResultBatch batch = response.getBatch();
+      if (batch.getEntityResultsCount() == 0) {
+        throw new NoSuchElementException(
+            "Datastore statistics for kind " + ourKind + " unavailable");
+      }
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("entity_bytes").getIntegerValue();
+    }
+
+    /** Builds a {@link RunQueryRequest} from the {@code query} and {@code 
namespace}. */
+    static RunQueryRequest makeRequest(Query query, @Nullable String 
namespace) {
+      RunQueryRequest.Builder requestBuilder = 
RunQueryRequest.newBuilder().setQuery(query);
+      if (namespace != null) {
+        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+      }
+      return requestBuilder.build();
+    }
+
+    /**
+     * A helper function to get the split queries, taking into account the 
optional
+     * {@code namespace}.
+     */
+    private static List<Query> splitQuery(Query query, @Nullable String 
namespace,
+        Datastore datastore, QuerySplitter querySplitter, int numSplits) 
throws DatastoreException {
+      // If namespace is set, include it in the split request so splits are 
calculated accordingly.
+      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+      if (namespace != null) {
+        partitionBuilder.setNamespaceId(namespace);
+      }
+
+      return querySplitter.getSplits(query, partitionBuilder.build(), 
numSplits, datastore);
+    }
+
     /**
      * Note that only {@code namespace} is really {@code @Nullable}. The other 
parameters may be
      * {@code null} as a matter of build order, but if they are {@code null} 
at instantiation time,
      * an error will be thrown.
      */
-    private Read(@Nullable String projectId, @Nullable Query query, @Nullable 
String namespace) {
+    private Read(@Nullable String projectId, @Nullable Query query, @Nullable 
String namespace,
+        int numQuerySplits) {
       this.projectId = projectId;
       this.query = query;
       this.namespace = namespace;
+      this.numQuerySplits = numQuerySplits;
     }
 
     /**
@@ -207,7 +315,7 @@ public class V1Beta3 {
      */
     public V1Beta3.Read withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new V1Beta3.Read(projectId, query, namespace);
+      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
     }
 
     /**
@@ -222,14 +330,35 @@ public class V1Beta3 {
       checkNotNull(query, "query");
       checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
           "Invalid query limit %s: must be positive", 
query.getLimit().getValue());
-      return new V1Beta3.Read(projectId, query, namespace);
+      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
     }
 
     /**
      * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
      */
     public V1Beta3.Read withNamespace(String namespace) {
-      return new V1Beta3.Read(projectId, query, namespace);
+      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
+    }
+
+    /**
+     * Returns a new {@link V1Beta3.Read} that reads by splitting the given 
{@code query} into
+     * {@code numQuerySplits}.
+     *
+     * <p>The semantics for the query splitting is defined below:
+     * <ul>
+     *   <li>Any value less than or equal to 0 will be ignored, and the number 
of splits will be
+     *   chosen dynamically at runtime based on the query data size.
+     *   <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be 
capped at
+     *   {@code NUM_QUERY_SPLITS_MAX}.
+     *   <li>If the {@code query} has a user limit set, then {@code 
numQuerySplits} will be
+     *   ignored and no split will be performed.
+     *   <li>Under certain cases Cloud Datastore is unable to split query to 
the requested number of
+     *   splits. In such cases we just use whatever the Datastore returns.
+     * </ul>
+     */
+    public V1Beta3.Read withNumQuerySplits(int numQuerySplits) {
+      return new V1Beta3.Read(projectId, query, namespace,
+          Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX));
     }
 
     @Nullable
@@ -247,9 +376,45 @@ public class V1Beta3 {
       return namespace;
     }
 
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public PCollection<Entity> apply(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(getSource()));
+      V1Beta3Options v1Beta3Options = V1Beta3Options.from(getProjectId(), 
getQuery(),
+          getNamespace());
+
+      /*
+       * This composite transform involves the following steps:
+       *   1. Create a singleton of the user provided {@code query} and apply 
a {@link ParDo} that
+       *   splits the query into {@code numQuerySplits} and assign each split 
query a unique
+       *   {@code Integer} as the key. The resulting output is of the type
+       *   {@code PCollection<KV<Integer, Query>>}.
+       *
+       *   If the value of {@code numQuerySplits} is less than or equal to 0, 
then the number of
+       *   splits will be computed dynamically based on the size of the data 
for the {@code query}.
+       *
+       *   2. The resulting {@code PCollection} is sharded using a {@link 
GroupByKey} operation. The
+       *   queries are extracted from they {@code KV<Integer, 
Iterable<Query>>} and flattened to
+       *   output a {@code PCollection<Query>}.
+       *
+       *   3. In the third step, a {@code ParDo} reads entities for each query 
and outputs
+       *   a {@code PCollection<Entity>}.
+       */
+      PCollection<KV<Integer, Query>> queries = input
+          .apply(Create.of(query))
+          .apply(ParDo.of(new SplitQueryFn(v1Beta3Options, numQuerySplits)));
+
+      PCollection<Query> shardedQueries = queries
+          .apply(GroupByKey.<Integer, Query>create())
+          .apply(Values.<Iterable<Query>>create())
+          .apply(Flatten.<Query>iterables());
+
+      PCollection<Entity> entities = shardedQueries
+          .apply(ParDo.of(new ReadFn(v1Beta3Options)));
+
+      return entities;
     }
 
     @Override
@@ -279,482 +444,295 @@ public class V1Beta3 {
           .toString();
     }
 
-    @VisibleForTesting
-    DatastoreSource getSource() {
-      return new DatastoreSource(projectId, query, namespace);
-    }
-  }
-
-  /**
-   * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
-   * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
-   */
-  public Write write() {
-    return new Write(null);
-  }
-
-  /**
-   * A {@link PTransform} that writes {@link Entity} objects to Cloud 
Datastore.
-   *
-   * @see DatastoreIO
-   */
-  public static class Write extends PTransform<PCollection<Entity>, PDone> {
-    @Nullable
-    private final String projectId;
-
     /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
+     * A class for v1beta3 Datastore related options.
      */
-    public Write(@Nullable String projectId) {
-      this.projectId = projectId;
-    }
-
-    /**
-     * Returns a new {@link Write} that writes to the Cloud Datastore for the 
specified project.
-     */
-    public Write withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new Write(projectId);
-    }
-
-    @Override
-    public PDone apply(PCollection<Entity> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
-    }
-
-    @Override
-    public void validate(PCollection<Entity> input) {
-      checkNotNull(projectId, "projectId");
-    }
-
-    @Nullable
-    public String getProjectId() {
-      return projectId;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .toString();
-    }
+    @VisibleForTesting
+    static class V1Beta3Options implements Serializable {
+      private final Query query;
+      private final String projectId;
+      @Nullable
+      private final String namespace;
+
+      private V1Beta3Options(String projectId, Query query, @Nullable String 
namespace) {
+        this.projectId = checkNotNull(projectId, "projectId");
+        this.query = checkNotNull(query, "query");
+        this.namespace = namespace;
+      }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
-    }
-  }
+      public static V1Beta3Options from(String projectId, Query query, 
@Nullable String namespace) {
+        return new V1Beta3Options(projectId, query, namespace);
+      }
 
-  /**
-   * A {@link org.apache.beam.sdk.io.Source} that reads data from Datastore.
-   */
-  static class DatastoreSource extends BoundedSource<Entity> {
+      public Query getQuery() {
+        return query;
+      }
 
-    @Override
-    public Coder<Entity> getDefaultOutputCoder() {
-      return ProtoCoder.of(Entity.class);
-    }
+      public String getProjectId() {
+        return projectId;
+      }
 
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) {
-      return false;
+      @Nullable
+      public String getNamespace() {
+        return namespace;
+      }
     }
 
-    @Override
-    public List<DatastoreSource> splitIntoBundles(long desiredBundleSizeBytes,
-        PipelineOptions options) throws Exception {
-      // Users may request a limit on the number of results. We can currently 
support this by
-      // simply disabling parallel reads and using only a single split.
-      if (query.hasLimit()) {
-        return ImmutableList.of(this);
+    /**
+     * A {@link DoFn} that splits a given query into multiple sub-queries, 
assigns them unique keys
+     * and outputs them as {@link KV}.
+     */
+    @VisibleForTesting
+    static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
+      private final V1Beta3Options options;
+      // number of splits to make for a given query
+      private final int numSplits;
+
+      private final V1Beta3DatastoreFactory datastoreFactory;
+      // Datastore client
+      private transient Datastore datastore;
+      // Query splitter
+      private transient QuerySplitter querySplitter;
+
+      public SplitQueryFn(V1Beta3Options options, int numSplits) {
+        this(options, numSplits, new V1Beta3DatastoreFactory());
       }
 
-      long numSplits;
-      try {
-        numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / 
desiredBundleSizeBytes);
-      } catch (Exception e) {
-        // Fallback in case estimated size is unavailable. TODO: fix this, 
it's horrible.
-        numSplits = 12;
+      @VisibleForTesting
+      SplitQueryFn(V1Beta3Options options, int numSplits,
+          V1Beta3DatastoreFactory datastoreFactory) {
+        this.options = options;
+        this.numSplits = numSplits;
+        this.datastoreFactory = datastoreFactory;
       }
 
-      // If the desiredBundleSize or number of workers results in 1 split, 
simply return
-      // a source that reads from the original query.
-      if (numSplits <= 1) {
-        return ImmutableList.of(this);
+      @Override
+      public void startBundle(Context c) throws Exception {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
options.projectId);
+        querySplitter = datastoreFactory.getQuerySplitter();
       }
 
-      List<Query> datastoreSplits;
-      try {
-        datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), 
options);
-      } catch (IllegalArgumentException | DatastoreException e) {
-        LOG.warn("Unable to parallelize the given query: {}", query, e);
-        return ImmutableList.of(this);
-      }
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        int key = 1;
+        Query query = c.element();
 
-      ImmutableList.Builder<DatastoreSource> splits = ImmutableList.builder();
-      for (Query splitQuery : datastoreSplits) {
-        splits.add(new DatastoreSource(projectId, splitQuery, namespace));
-      }
-      return splits.build();
-    }
+        // If query has a user set limit, then do not split.
+        if (query.hasLimit()) {
+          c.output(KV.of(key, query));
+          return;
+        }
 
-    @Override
-    public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) 
throws IOException {
-      return new DatastoreReader(this, getDatastore(pipelineOptions));
-    }
+        int estimatedNumSplits;
+        // Compute the estimated numSplits if numSplits is not specified by 
the user.
+        if (numSplits <= 0) {
+          estimatedNumSplits = getEstimatedNumSplits(datastore, query, 
options.getNamespace());
+        } else {
+          estimatedNumSplits = numSplits;
+        }
 
-    @Override
-    public void validate() {
-      checkNotNull(query, "query");
-      checkNotNull(projectId, "projectId");
-    }
+        List<Query> querySplits;
+        try {
+          querySplits = splitQuery(query, options.getNamespace(), datastore, 
querySplitter,
+              estimatedNumSplits);
+        } catch (Exception e) {
+          LOG.warn("Unable to parallelize the given query: {}", query, e);
+          querySplits = ImmutableList.of(query);
+        }
 
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
-      // Datastore provides no way to get a good estimate of how large the 
result of a query
-      // will be. As a rough approximation, we attempt to fetch the statistics 
of the whole
-      // entity kind being queried, using the __Stat_Kind__ system table, 
assuming exactly 1 kind
-      // is specified in the query.
-      //
-      // See https://cloud.google.com/datastore/docs/concepts/stats
-      if (mockEstimateSizeBytes != null) {
-        return mockEstimateSizeBytes;
+        // assign unique keys to query splits.
+        for (Query subquery : querySplits) {
+          c.output(KV.of(key++, subquery));
+        }
       }
 
-      Datastore datastore = getDatastore(options);
-      if (query.getKindCount() != 1) {
-        throw new UnsupportedOperationException(
-            "Can only estimate size for queries specifying exactly 1 kind.");
+      @Override
+      public void populateDisplayData(Builder builder) {
+        super.populateDisplayData(builder);
+        builder
+            .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
+                .withLabel("ProjectId"))
+            .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
+                .withLabel("Namespace"))
+            .addIfNotNull(DisplayData.item("query", 
options.getQuery().toString())
+                .withLabel("Query"));
       }
-      String ourKind = query.getKind(0).getName();
-      long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
-      Query.Builder query = Query.newBuilder();
-      if (namespace == null) {
-        query.addKindBuilder().setName("__Stat_Kind__");
-      } else {
-        query.addKindBuilder().setName("__Ns_Stat_Kind__");
+    }
+
+    /**
+     * A {@link DoFn} that reads entities from Datastore for each query.
+     */
+    @VisibleForTesting
+    static class ReadFn extends DoFn<Query, Entity> {
+      private final V1Beta3Options options;
+      private final V1Beta3DatastoreFactory datastoreFactory;
+      // Datastore client
+      private transient Datastore datastore;
+
+      public ReadFn(V1Beta3Options options) {
+        this(options, new V1Beta3DatastoreFactory());
       }
-      query.setFilter(makeAndFilter(
-          makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
-          makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
-      RunQueryRequest request = makeRequest(query.build());
 
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for per-kind statistics took {}ms", 
System.currentTimeMillis() - now);
+      @VisibleForTesting
+      ReadFn(V1Beta3Options options, V1Beta3DatastoreFactory datastoreFactory) 
{
+        this.options = options;
+        this.datastoreFactory = datastoreFactory;
+      }
 
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultsCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore statistics for kind " + ourKind + " unavailable");
+      @Override
+      public void startBundle(Context c) throws Exception {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
options.getProjectId());
       }
-      Entity entity = batch.getEntityResults(0).getEntity();
-      return entity.getProperties().get("entity_bytes").getIntegerValue();
-    }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("ProjectId"))
-          .addIfNotNull(DisplayData.item("namespace", namespace)
-              .withLabel("Namespace"))
-          .addIfNotNull(DisplayData.item("query", query.toString())
-              .withLabel("Query"));
-    }
+      /** Read and output entities for the given query. */
+      @Override
+      public void processElement(ProcessContext context) throws Exception {
+        Query query = context.element();
+        String namespace = options.getNamespace();
+        int userLimit = query.hasLimit()
+            ? query.getLimit().getValue() : Integer.MAX_VALUE;
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .add("query", query)
-          .add("namespace", namespace)
-          .toString();
-    }
+        boolean moreResults = true;
+        QueryResultBatch currentBatch = null;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DatastoreSource.class);
-    private final String projectId;
-    private final Query query;
-    @Nullable
-    private final String namespace;
+        while (moreResults) {
+          Query.Builder queryBuilder = query.toBuilder().clone();
+          queryBuilder.setLimit(Int32Value.newBuilder().setValue(
+              Math.min(userLimit, QUERY_BATCH_LIMIT)));
 
-    /** For testing only. TODO: This could be much cleaner with dependency 
injection. */
-    @Nullable
-    private QuerySplitter mockSplitter;
-    @Nullable
-    private Long mockEstimateSizeBytes;
+          if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+            queryBuilder.setStartCursor(currentBatch.getEndCursor());
+          }
 
-    DatastoreSource(String projectId, Query query, @Nullable String namespace) 
{
-      this.projectId = projectId;
-      this.query = query;
-      this.namespace = namespace;
-    }
+          RunQueryRequest request = makeRequest(queryBuilder.build(), 
namespace);
+          RunQueryResponse response = datastore.runQuery(request);
 
-    /**
-     * A helper function to get the split queries, taking into account the 
optional
-     * {@code namespace} and whether there is a mock splitter.
-     */
-    private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
-        throws DatastoreException {
-      // If namespace is set, include it in the split request so splits are 
calculated accordingly.
-      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
-      if (namespace != null) {
-        partitionBuilder.setNamespaceId(namespace);
-      }
+          currentBatch = response.getBatch();
 
-      if (mockSplitter != null) {
-        // For testing.
-        return mockSplitter.getSplits(query, partitionBuilder.build(), 
numSplits, null);
-      }
+          // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
+          // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, 
so
+          // use result count to determine if more results might exist.
+          int numFetch = currentBatch.getEntityResultsCount();
+          if (query.hasLimit()) {
+            verify(userLimit >= numFetch,
+                "Expected userLimit %s >= numFetch %s, because query limit %s 
must be <= userLimit",
+                userLimit, numFetch, query.getLimit());
+            userLimit -= numFetch;
+          }
 
-      return DatastoreHelper.getQuerySplitter().getSplits(
-          query, partitionBuilder.build(), numSplits, getDatastore(options));
-    }
+          // output all the entities from the current batch.
+          for (EntityResult entityResult : 
currentBatch.getEntityResultsList()) {
+            context.output(entityResult.getEntity());
+          }
 
-    /**
-     * Builds a {@link RunQueryRequest} from the {@code query}, using the 
properties set on this
-     * {@code DatastoreSource}. For example, sets the {@code namespace} for 
the request.
-     */
-    private RunQueryRequest makeRequest(Query query) {
-      RunQueryRequest.Builder requestBuilder = 
RunQueryRequest.newBuilder().setQuery(query);
-      if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+          // Check if we have more entities to be read.
+          moreResults =
+              // User-limit does not exist (so userLimit == MAX_VALUE) and/or 
has not been satisfied
+              (userLimit > 0)
+                  // All indications from the API are that there are/may be 
more results.
+                  && ((numFetch == QUERY_BATCH_LIMIT)
+                  || (currentBatch.getMoreResults() == NOT_FINISHED));
+        }
       }
-      return requestBuilder.build();
     }
 
     /**
-     * Datastore system tables with statistics are periodically updated. This 
method fetches
-     * the latest timestamp of statistics update using the {@code 
__Stat_Total__} table.
+     * A wrapper factory class for Datastore singleton classes {@link 
DatastoreFactory} and
+     * {@link QuerySplitter}
+     *
+     * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java 
serializable, hence
+     * wrapping them under this class, which implements {@link Serializable}.
      */
-    private long queryLatestStatisticsTimestamp(Datastore datastore) throws 
DatastoreException {
-      Query.Builder query = Query.newBuilder();
-      query.addKindBuilder().setName("__Stat_Total__");
-      query.addOrder(makeOrder("timestamp", DESCENDING));
-      query.setLimit(Int32Value.newBuilder().setValue(1));
-      RunQueryRequest request = makeRequest(query.build());
+    @VisibleForTesting
+    static class V1Beta3DatastoreFactory implements Serializable {
+
+      /** Builds a Datastore client for the given pipeline options and 
project. */
+      public Datastore getDatastore(PipelineOptions pipelineOptions, String 
projectId) {
+        DatastoreOptions.Builder builder =
+            new DatastoreOptions.Builder()
+                .projectId(projectId)
+                .initializer(
+                    new RetryHttpRequestInitializer()
+                );
+
+        Credential credential = 
pipelineOptions.as(GcpOptions.class).getGcpCredential();
+        if (credential != null) {
+          builder.credential(credential);
+        }
 
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for latest stats timestamp of project {} took {}ms", 
projectId,
-          System.currentTimeMillis() - now);
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultsCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore total statistics for project " + projectId + " 
unavailable");
+        return DatastoreFactory.get().create(builder.build());
       }
-      Entity entity = batch.getEntityResults(0).getEntity();
-      return 
entity.getProperties().get("timestamp").getTimestampValue().getNanos();
-    }
 
-    private Datastore getDatastore(PipelineOptions pipelineOptions) {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .projectId(projectId)
-              .initializer(
-                  new RetryHttpRequestInitializer()
-              );
-
-      Credential credential = 
pipelineOptions.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
+      /** Builds a Datastore {@link QuerySplitter}. */
+      public QuerySplitter getQuerySplitter() {
+        return DatastoreHelper.getQuerySplitter();
       }
-      return DatastoreFactory.get().create(builder.build());
-    }
-
-    /** For testing only. */
-    DatastoreSource withMockSplitter(QuerySplitter splitter) {
-      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
-      res.mockSplitter = splitter;
-      res.mockEstimateSizeBytes = mockEstimateSizeBytes;
-      return res;
     }
+  }
 
-    /** For testing only. */
-    DatastoreSource withMockEstimateSizeBytes(Long estimateSizeBytes) {
-      DatastoreSource res = new DatastoreSource(projectId, query, namespace);
-      res.mockSplitter = mockSplitter;
-      res.mockEstimateSizeBytes = estimateSizeBytes;
-      return res;
-    }
 
-    @VisibleForTesting
-    Query getQuery() {
-      return query;
-    }
+  /**
+   * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
+   * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
+   */
+  public Write write() {
+    return new Write(null);
   }
 
   /**
-   * A {@link DatastoreSource.Reader} over the records from a query of the 
datastore.
+   * A {@link PTransform} that writes {@link Entity} objects to Cloud 
Datastore.
    *
-   * <p>Timestamped records are currently not supported.
-   * All records implicitly have the timestamp of {@code 
BoundedWindow.TIMESTAMP_MIN_VALUE}.
+   * @see DatastoreIO
    */
-  @VisibleForTesting
-  static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
-    private final DatastoreSource source;
-
-    /**
-     * Datastore to read from.
-     */
-    private final Datastore datastore;
-
-    /**
-     * True if more results may be available.
-     */
-    private boolean moreResults;
-
-    /**
-     * Iterator over records.
-     */
-    private java.util.Iterator<EntityResult> entities;
-
-    /**
-     * Current batch of query results.
-     */
-    private QueryResultBatch currentBatch;
-
-    /**
-     * Maximum number of results to request per query.
-     *
-     * <p>Must be set, or it may result in an I/O error when querying
-     * Cloud Datastore.
-     */
-    private static final int QUERY_BATCH_LIMIT = 500;
+  public static class Write extends PTransform<PCollection<Entity>, PDone> {
+    @Nullable
+    private final String projectId;
 
     /**
-     * Remaining user-requested limit on the number of sources to return. If 
the user did not set a
-     * limit, then this variable will always have the value {@link 
Integer#MAX_VALUE} and will never
-     * be decremented.
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
      */
-    private int userLimit;
-
-    private volatile boolean done = false;
-
-    private Entity currentEntity;
+    public Write(@Nullable String projectId) {
+      this.projectId = projectId;
+    }
 
     /**
-     * Returns a DatastoreReader with DatastoreSource and Datastore object set.
-     *
-     * @param datastore a datastore connection to use.
+     * Returns a new {@link Write} that writes to the Cloud Datastore for the 
specified project.
      */
-    public DatastoreReader(DatastoreSource source, Datastore datastore) {
-      this.source = source;
-      this.datastore = datastore;
-      // If the user set a limit on the query, remember it. Otherwise pin to 
MAX_VALUE.
-      userLimit = source.query.hasLimit()
-          ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
-    }
-
-    @Override
-    public Entity getCurrent() {
-      return currentEntity;
-    }
-
-    @Override
-    public final long getSplitPointsConsumed() {
-      return done ? 1 : 0;
-    }
-
-    @Override
-    public final long getSplitPointsRemaining() {
-      return done ? 0 : 1;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return advance();
+    public Write withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new Write(projectId);
     }
 
     @Override
-    public boolean advance() throws IOException {
-      if (entities == null || (!entities.hasNext() && moreResults)) {
-        try {
-          entities = getIteratorAndMoveCursor();
-        } catch (DatastoreException e) {
-          throw new IOException(e);
-        }
-      }
-
-      if (entities == null || !entities.hasNext()) {
-        currentEntity = null;
-        done = true;
-        return false;
-      }
-
-      currentEntity = entities.next().getEntity();
-      return true;
+    public PDone apply(PCollection<Entity> input) {
+      return input.apply(
+          org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId)));
     }
 
     @Override
-    public void close() throws IOException {
-      // Nothing
+    public void validate(PCollection<Entity> input) {
+      checkNotNull(projectId, "projectId");
     }
 
-    @Override
-    public DatastoreSource getCurrentSource() {
-      return source;
+    @Nullable
+    public String getProjectId() {
+      return projectId;
     }
 
     @Override
-    public DatastoreSource splitAtFraction(double fraction) {
-      // Not supported.
-      return null;
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .toString();
     }
 
     @Override
-    public Double getFractionConsumed() {
-      // Not supported.
-      return null;
-    }
-
-    /**
-     * Returns an iterator over the next batch of records for the query
-     * and updates the cursor to get the next batch as needed.
-     * Query has specified limit and offset from InputSplit.
-     */
-    private Iterator<EntityResult> getIteratorAndMoveCursor() throws 
DatastoreException {
-      Query.Builder query = source.query.toBuilder().clone();
-      query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, 
QUERY_BATCH_LIMIT)));
-      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
-        query.setStartCursor(currentBatch.getEndCursor());
-      }
-
-      RunQueryRequest request = source.makeRequest(query.build());
-      RunQueryResponse response = datastore.runQuery(request);
-
-      currentBatch = response.getBatch();
-
-      // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
-      // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
-      // use result count to determine if more results might exist.
-      int numFetch = currentBatch.getEntityResultsCount();
-      if (source.query.hasLimit()) {
-        verify(userLimit >= numFetch,
-            "Expected userLimit %s >= numFetch %s, because query limit %s 
should be <= userLimit",
-            userLimit, numFetch, query.getLimit());
-        userLimit -= numFetch;
-      }
-      moreResults =
-          // User-limit does not exist (so userLimit == MAX_VALUE) and/or has 
not been satisfied.
-          (userLimit > 0)
-              // All indications from the API are that there are/may be more 
results.
-              && ((numFetch == QUERY_BATCH_LIMIT)
-              || (currentBatch.getMoreResults() == NOT_FINISHED));
-
-      // May receive a batch of 0 results if the number of records is a 
multiple
-      // of the request limit.
-      if (numFetch == 0) {
-        return null;
-      }
-
-      return currentBatch.getEntityResultsList().iterator();
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92030133/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
index a60e7c5..9947c60 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
@@ -17,10 +17,17 @@
  */
 package org.apache.beam.sdk.io.gcp.datastore;
 
+import static 
org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
+import static 
org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
+import static 
org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
+import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
+import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasItem;
@@ -31,26 +38,25 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreReader;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreSource;
 import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter;
-import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
+import 
org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3DatastoreFactory;
+import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
@@ -59,16 +65,12 @@ import com.google.common.collect.Lists;
 import com.google.datastore.v1beta3.Entity;
 import com.google.datastore.v1beta3.EntityResult;
 import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.KindExpression;
 import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.PropertyFilter;
 import com.google.datastore.v1beta3.Query;
 import com.google.datastore.v1beta3.QueryResultBatch;
 import com.google.datastore.v1beta3.RunQueryRequest;
 import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.Value;
 import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
 import com.google.datastore.v1beta3.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
 
@@ -86,8 +88,9 @@ import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Set;
 
 /**
@@ -99,36 +102,36 @@ public class V1Beta3Test {
   private static final String NAMESPACE = "testNamespace";
   private static final String KIND = "testKind";
   private static final Query QUERY;
+  private static final V1Beta3Options v1Beta3Options;
   static {
     Query.Builder q = Query.newBuilder();
     q.addKindBuilder().setName(KIND);
     QUERY = q.build();
+    v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE);
   }
   private V1Beta3.Read initialRead;
 
   @Mock
   Datastore mockDatastore;
+  @Mock
+  QuerySplitter mockQuerySplitter;
+  @Mock
+  V1Beta3DatastoreFactory mockDatastoreFactory;
 
   @Rule
   public final ExpectedException thrown = ExpectedException.none();
 
-  @Rule public final ExpectedLogs logged = 
ExpectedLogs.none(DatastoreSource.class);
-
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
 
     initialRead = DatastoreIO.v1beta3().read()
         .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-  }
 
-  /**
-   * Helper function to create a test {@code DataflowPipelineOptions}.
-   */
-  static final GcpOptions testPipelineOptions() {
-    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
-    options.setGcpCredential(new TestCredential());
-    return options;
+    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), 
any(String.class)))
+        .thenReturn(mockDatastore);
+    when(mockDatastoreFactory.getQuerySplitter())
+        .thenReturn(mockQuerySplitter);
   }
 
   @Test
@@ -265,182 +268,6 @@ public class V1Beta3Test {
         displayData, hasItem(hasDisplayItem("projectId")));
   }
 
-  @Test
-  public void testQuerySplitBasic() throws Exception {
-    KindExpression mykind = 
KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = new ArrayList<>();
-    for (int i = 0; i < 8; ++i) {
-      mockSplits.add(
-          Query.newBuilder()
-              .addKind(mykind)
-              .setFilter(
-                  DatastoreHelper.makeFilter("foo", 
PropertyFilter.Operator.EQUAL,
-                      Value.newBuilder().setIntegerValue(i).build()))
-              .build());
-    }
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    /* No namespace */
-    PartitionId partition = PartitionId.newBuilder().build();
-    when(splitter.getSplits(any(Query.class), eq(partition), eq(8), 
any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withNamespace(null)
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    List<DatastoreSource> bundles = io.splitIntoBundles(1024, 
testPipelineOptions());
-    assertEquals(8, bundles.size());
-    for (int i = 0; i < 8; ++i) {
-      DatastoreSource bundle = bundles.get(i);
-      Query bundleQuery = bundle.getQuery();
-      assertEquals("mykind", bundleQuery.getKind(0).getName());
-      assertEquals(i, 
bundleQuery.getFilter().getPropertyFilter().getValue().getIntegerValue());
-    }
-  }
-
-  /**
-   * Verifies that when namespace is set in the source, the split request 
includes the namespace.
-   */
-  @Test
-  public void testSourceWithNamespace() throws Exception {
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    DatastoreSource io = initialRead
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    io.splitIntoBundles(1024, testPipelineOptions());
-
-    PartitionId partition = 
PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
-    verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), 
any(Datastore.class));
-    verifyNoMoreInteractions(splitter);
-  }
-
-  @Test
-  public void testQuerySplitWithZeroSize() throws Exception {
-    KindExpression mykind = 
KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = Lists.newArrayList(
-        Query.newBuilder()
-            .addKind(mykind)
-            .build());
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(1), 
any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(0L);
-
-    List<DatastoreSource> bundles = io.splitIntoBundles(1024, 
testPipelineOptions());
-    assertEquals(1, bundles.size());
-    verify(splitter, never())
-        .getSplits(any(Query.class), any(PartitionId.class), eq(1), 
any(Datastore.class));
-    DatastoreSource bundle = bundles.get(0);
-    Query bundleQuery = bundle.getQuery();
-    assertEquals("mykind", bundleQuery.getKind(0).getName());
-    assertFalse(bundleQuery.hasFilter());
-  }
-
-  /**
-   * Tests that a query with a user-provided limit field does not split, and 
does not even
-   * interact with a query splitter.
-   */
-  @Test
-  public void testQueryDoesNotSplitWithLimitSet() throws Exception {
-    // Minimal query with a limit
-    Query query = 
Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
-
-    // Mock query splitter, should not be invoked.
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(2), 
any(Datastore.class)))
-        .thenThrow(new AssertionError("Splitter should not be invoked"));
-
-    List<DatastoreSource> bundles =
-        initialRead
-            .withQuery(query)
-            .getSource()
-            .withMockSplitter(splitter)
-            .splitIntoBundles(1024, testPipelineOptions());
-
-    assertEquals(1, bundles.size());
-    assertEquals(query, bundles.get(0).getQuery());
-    verifyNoMoreInteractions(splitter);
-  }
-
-  /**
-   * Tests that when {@link QuerySplitter} cannot split a query, {@link 
V1Beta3} falls back to
-   * a single split.
-   */
-  @Test
-  public void testQuerySplitterThrows() throws Exception {
-    // Mock query splitter that throws IllegalArgumentException
-    IllegalArgumentException exception =
-        new IllegalArgumentException("query not supported by splitter");
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(
-            splitter.getSplits(
-                any(Query.class), any(PartitionId.class), any(Integer.class), 
any(Datastore.class)))
-        .thenThrow(exception);
-
-    Query query = 
Query.newBuilder().addKind(KindExpression.newBuilder().setName("myKind")).build();
-    List<DatastoreSource> bundles =
-        initialRead
-            .withQuery(query)
-            .getSource()
-            .withMockSplitter(splitter)
-            .withMockEstimateSizeBytes(10240L)
-            .splitIntoBundles(1024, testPipelineOptions());
-
-    assertEquals(1, bundles.size());
-    assertEquals(query, bundles.get(0).getQuery());
-    verify(splitter, times(1))
-        .getSplits(
-            any(Query.class), any(PartitionId.class), any(Integer.class), 
any(Datastore.class));
-    logged.verifyWarn("Unable to parallelize the given query", exception);
-  }
-
-  @Test
-  public void testQuerySplitSizeUnavailable() throws Exception {
-    KindExpression mykind = 
KindExpression.newBuilder().setName("mykind").build();
-    Query query = Query.newBuilder().addKind(mykind).build();
-
-    List<Query> mockSplits = 
Lists.newArrayList(Query.newBuilder().addKind(mykind).build());
-
-    QuerySplitter splitter = mock(QuerySplitter.class);
-    when(splitter.getSplits(any(Query.class), any(PartitionId.class), eq(12), 
any(Datastore.class)))
-        .thenReturn(mockSplits);
-
-    DatastoreSource io = initialRead
-        .withQuery(query)
-        .getSource()
-        .withMockSplitter(splitter)
-        .withMockEstimateSizeBytes(8 * 1024L);
-
-    DatastoreSource spiedIo = spy(io);
-    when(spiedIo.getEstimatedSizeBytes(any(PipelineOptions.class)))
-        .thenThrow(new NoSuchElementException());
-
-    List<DatastoreSource> bundles = spiedIo.splitIntoBundles(1024, 
testPipelineOptions());
-    assertEquals(1, bundles.size());
-    verify(splitter, never())
-        .getSplits(any(Query.class), any(PartitionId.class), eq(1), 
any(Datastore.class));
-    DatastoreSource bundle = bundles.get(0);
-    Query bundleQuery = bundle.getQuery();
-    assertEquals("mykind", bundleQuery.getKind(0).getName());
-    assertFalse(bundleQuery.hasFilter());
-  }
-
   /**
    * Test building a Write using builder methods.
    */
@@ -525,22 +352,146 @@ public class V1Beta3Test {
     assertThat(writer.entities, containsInAnyOrder(expected.toArray()));
   }
 
-  /** Datastore batch API limit in number of records per query. */
-  private static final int DATASTORE_QUERY_BATCH_LIMIT = 500;
+  /**
+   * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return 
estimated size for a
+   * query.
+   */
+  @Test
+  public void testEstimatedSizeBytes() throws Exception {
+    long entityBytes = 100L;
+    // Per Kind statistics request and response
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), 
NAMESPACE);
+    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+    when(mockDatastore.runQuery(statRequest))
+        .thenReturn(statResponse);
+
+    assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, 
NAMESPACE));
+    verify(mockDatastore, times(1)).runQuery(statRequest);
+  }
+
+  /**
+   * Tests {@link SplitQueryFn} when number of query splits is specified.
+   */
+  @Test
+  public void testSplitQueryFnWithNumSplits() throws Exception {
+    int numSplits = 100;
+    when(mockQuerySplitter.getSplits(
+        eq(QUERY), any(PartitionId.class), eq(numSplits), 
any(Datastore.class)))
+        .thenReturn(splitQuery(QUERY, numSplits));
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, 
mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = 
DoFnTester.of(splitQueryFn);
+    /**
+     * Although Datastore client is marked transient in {@link SplitQueryFn}, 
when injected through
+     * mock factory using a when clause for unit testing purposes, it is not 
serializable
+     * because it doesn't have a no-arg constructor. Thus disabling the 
cloning to prevent the
+     * doFn from being serialized.
+     */
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+    assertEquals(queries.size(), numSplits);
+    verifyUniqueKeys(queries);
+    verify(mockQuerySplitter, times(1)).getSplits(
+        eq(QUERY), any(PartitionId.class), eq(numSplits), 
any(Datastore.class));
+    verifyZeroInteractions(mockDatastore);
+  }
+
+  /**
+   * Tests {@link SplitQueryFn} when no query splits is specified.
+   */
+  @Test
+  public void testSplitQueryFnWithoutNumSplits() throws Exception {
+    // Force SplitQueryFn to compute the number of query splits
+    int numSplits = 0;
+    int expectedNumSplits = 20;
+    long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+
+    // Per Kind statistics request and response
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), 
NAMESPACE);
+    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+    when(mockDatastore.runQuery(statRequest))
+        .thenReturn(statResponse);
+    when(mockQuerySplitter.getSplits(
+        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), 
any(Datastore.class)))
+        .thenReturn(splitQuery(QUERY, expectedNumSplits));
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, 
mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = 
DoFnTester.of(splitQueryFn);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+    assertEquals(queries.size(), expectedNumSplits);
+    verifyUniqueKeys(queries);
+    verify(mockQuerySplitter, times(1)).getSplits(
+        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), 
any(Datastore.class));
+    verify(mockDatastore, times(1)).runQuery(statRequest);
+  }
+
+  /**
+   * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user 
specified limit.
+   */
+  @Test
+  public void testSplitQueryFnWithQueryLimit() throws Exception {
+    Query queryWithLimit = QUERY.toBuilder().clone()
+        .setLimit(Int32Value.newBuilder().setValue(1))
+        .build();
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, 
mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = 
DoFnTester.of(splitQueryFn);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = 
doFnTester.processBundle(queryWithLimit);
+
+    assertEquals(queries.size(), 1);
+    verifyUniqueKeys(queries);
+    verifyNoMoreInteractions(mockDatastore);
+    verifyNoMoreInteractions(mockQuerySplitter);
+  }
+
+  /** Tests {@link ReadFn} with a query limit less than one batch. */
+  @Test
+  public void testReadFnWithOneBatch() throws Exception {
+    readFnTest(5);
+  }
+
+  /** Tests {@link ReadFn} with a query limit more than one batch, and not a 
multiple. */
+  @Test
+  public void testReadFnWithMultipleBatches() throws Exception {
+    readFnTest(QUERY_BATCH_LIMIT + 5);
+  }
+
+  /** Tests {@link ReadFn} for several batches, using an exact multiple of 
batch size results. */
+  @Test
+  public void testReadFnWithBatchesExactMultiple() throws Exception {
+    readFnTest(5 * QUERY_BATCH_LIMIT);
+  }
+
+  /** Helper Methods */
+
+  /** A helper function that verifies if all the queries have unique keys. */
+  private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
+    Set<Integer> keys = new HashSet<>();
+    for (KV<Integer, Query> kv: queries) {
+      keys.add(kv.getKey());
+    }
+    assertEquals(keys.size(), queries.size());
+  }
 
   /**
    * A helper function that creates mock {@link Entity} results in response to 
a query. Always
    * indicates that more results are available, unless the batch is limited to 
fewer than
-   * {@link #DATASTORE_QUERY_BATCH_LIMIT} results.
+   * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results.
    */
   private static RunQueryResponse mockResponseForQuery(Query q) {
     // Every query V1Beta3 sends should have a limit.
     assertTrue(q.hasLimit());
 
-    // The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
+    // The limit should be in the range [1, QUERY_BATCH_LIMIT]
     int limit = q.getLimit().getValue();
     assertThat(limit, greaterThanOrEqualTo(1));
-    assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
+    assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
 
     // Create the requested number of entities.
     List<EntityResult> entities = new ArrayList<>(limit);
@@ -557,61 +508,80 @@ public class V1Beta3Test {
         .addAllEntityResults(entities)
         .setEntityResultType(EntityResult.ResultType.FULL)
         .setMoreResults(
-            limit == DATASTORE_QUERY_BATCH_LIMIT
+            limit == QUERY_BATCH_LIMIT
                 ? QueryResultBatch.MoreResultsType.NOT_FINISHED
                 : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
 
     return ret.build();
   }
 
-  /** Helper function to run a test reading from a limited-result query. */
-  private void runQueryLimitReadTest(int numEntities) throws Exception {
+  /** Helper function to run a test reading from a {@link ReadFn}. */
+  private void readFnTest(int numEntities) throws Exception {
     // An empty query to read entities.
     Query query = Query.newBuilder().setLimit(
         Int32Value.newBuilder().setValue(numEntities)).build();
-    V1Beta3.Read read =  
DatastoreIO.v1beta3().read().withQuery(query).withProjectId("mockProject");
 
     // Use mockResponseForQuery to generate results.
     when(mockDatastore.runQuery(any(RunQueryRequest.class)))
-        .thenAnswer(
-            new Answer<RunQueryResponse>() {
-              @Override
-              public RunQueryResponse answer(InvocationOnMock invocation) 
throws Throwable {
-                Query q = ((RunQueryRequest) 
invocation.getArguments()[0]).getQuery();
-                return mockResponseForQuery(q);
-              }
-            });
-
-    // Actually instantiate the reader.
-    DatastoreReader reader = new DatastoreReader(read.getSource(), 
mockDatastore);
-
-    // Simply count the number of results returned by the reader.
-    assertTrue(reader.start());
-    int resultCount = 1;
-    while (reader.advance()) {
-      resultCount++;
-    }
-    reader.close();
-
+        .thenAnswer(new Answer<RunQueryResponse>() {
+          @Override
+          public RunQueryResponse answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+            Query q = ((RunQueryRequest) 
invocationOnMock.getArguments()[0]).getQuery();
+            return mockResponseForQuery(q);
+          }
+        });
+
+    ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory);
+    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+    /**
+     * Although Datastore client is marked transient in {@link ReadFn}, when 
injected through
+     * mock factory using a when clause for unit testing purposes, it is not 
serializable
+     * because it doesn't have a no-arg constructor. Thus disabling the 
cloning to prevent the
+     * test object from being serialized.
+     */
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<Entity> entities = doFnTester.processBundle(query);
+
+    int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / 
QUERY_BATCH_LIMIT);
+    verify(mockDatastore, 
times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
     // Validate the number of results.
-    assertEquals(numEntities, resultCount);
-  }
-
-  /** Tests reading with a query limit less than one batch. */
-  @Test
-  public void testReadingWithLimitOneBatch() throws Exception {
-    runQueryLimitReadTest(5);
-  }
-
-  /** Tests reading with a query limit more than one batch, and not a 
multiple. */
-  @Test
-  public void testReadingWithLimitMultipleBatches() throws Exception {
-    runQueryLimitReadTest(DATASTORE_QUERY_BATCH_LIMIT + 5);
+    assertEquals(numEntities, entities.size());
+  }
+
+  /** Builds a per-kind statistics response with the given entity size. */
+  private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) 
{
+    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+    Entity.Builder entity = Entity.newBuilder();
+    entity.setKey(makeKey("dummyKind", "dummyId"));
+    entity.getMutableProperties().put("entity_bytes", 
makeValue(entitySizeInBytes).build());
+    EntityResult.Builder entityResult = EntityResult.newBuilder();
+    entityResult.setEntity(entity);
+    QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
+    batch.addEntityResults(entityResult);
+    timestampResponse.setBatch(batch);
+    return timestampResponse.build();
+  }
+
+  /** Builds a per-kind statistics query for the given timestamp and 
namespace. */
+  private static Query makeStatKindQuery(String namespace) {
+    Query.Builder statQuery = Query.newBuilder();
+    if (namespace == null) {
+      statQuery.addKindBuilder().setName("__Stat_Kind__");
+    } else {
+      statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
+    }
+    statQuery.setFilter(makeFilter("kind_name", EQUAL, 
makeValue(KIND)).build());
+    statQuery.addOrder(makeOrder("timestamp", DESCENDING));
+    statQuery.setLimit(Int32Value.newBuilder().setValue(1));
+    return statQuery.build();
   }
 
-  /** Tests reading several batches, using an exact multiple of batch size 
results. */
-  @Test
-  public void testReadingWithLimitMultipleBatchesExactMultiple() throws 
Exception {
-    runQueryLimitReadTest(5 * DATASTORE_QUERY_BATCH_LIMIT);
+  /** Generate dummy query splits. */
+  private List<Query> splitQuery(Query query, int numSplits) {
+    List<Query> queries = new LinkedList<>();
+    for (int i = 0; i < numSplits; i++) {
+      queries.add(query.toBuilder().clone().build());
+    }
+    return queries;
   }
 }

Reply via email to