Repository: beam
Updated Branches:
  refs/heads/master 6e808b2d6 -> 04c6ef4c7


Add ValueProvider options for DatastoreIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dd84426
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dd84426
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dd84426

Branch: refs/heads/master
Commit: 5dd8442620ab472f6f1e97455057229b94cea28e
Parents: 6e808b2
Author: Vikas Kedigehalli <[email protected]>
Authored: Wed Nov 16 00:05:41 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Wed Mar 15 10:45:24 2017 -0700

----------------------------------------------------------------------
 pom.xml                                         |   7 +
 sdks/java/io/google-cloud-platform/pom.xml      |  26 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 390 +++++++++++++++----
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   | 243 ++++++++++--
 .../sdk/io/gcp/datastore/SplitQueryFnIT.java    |   2 +-
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  66 +++-
 6 files changed, 617 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7183264..ca04ca0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,7 @@
     
<google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
     <guava.version>20.0</guava.version>
     <grpc.version>1.0.1</grpc.version>
+    
<grpc-google-common-protos.version>0.1.0</grpc-google-common-protos.version>
     <hamcrest.version>1.3</hamcrest.version>
     <jackson.version>2.7.2</jackson.version>
     <findbugs.version>3.0.1</findbugs.version>
@@ -818,6 +819,12 @@
       </dependency>
 
       <dependency>
+        <groupId>com.google.api.grpc</groupId>
+        <artifactId>grpc-google-common-protos</artifactId>
+        <version>${grpc-google-common-protos.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-core</artifactId>
         <version>${jackson.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index 0caf882..bd7f0d2 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -70,6 +70,26 @@
           </execution>
         </executions>
       </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <usedDependencies>
+                <!--
+                  BEAM-1632: Conflicting classes are brought in by 
bigtable-protos resulting in
+                  maven-dependency:analyze `unused dependency` error. Marking 
this as a
+                  `usedDependency` until the issue is resolved.
+                 -->
+                
<usedDependency>com.google.api.grpc:grpc-google-common-protos</usedDependency>
+              </usedDependencies>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
@@ -170,6 +190,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.api.grpc</groupId>
+      <artifactId>grpc-google-common-protos</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -252,5 +277,4 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 85c0744..d07fd50 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -41,10 +41,12 @@ import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.datastore.v1.CommitRequest;
 import com.google.datastore.v1.Entity;
 import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.GqlQuery;
 import com.google.datastore.v1.Key;
 import com.google.datastore.v1.Key.PathElement;
 import com.google.datastore.v1.Mutation;
@@ -60,6 +62,7 @@ import com.google.datastore.v1.client.DatastoreHelper;
 import com.google.datastore.v1.client.DatastoreOptions;
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
+import com.google.rpc.Code;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -67,8 +70,12 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -80,12 +87,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -185,7 +194,6 @@ import org.slf4j.LoggerFactory;
  *
  * @see org.apache.beam.sdk.runners.PipelineRunner
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
 public class DatastoreV1 {
 
   // A package-private constructor to prevent direct instantiation from 
outside of this package
@@ -234,9 +242,10 @@ public class DatastoreV1 {
      */
     static final int QUERY_BATCH_LIMIT = 500;
 
-    @Nullable public abstract String getProjectId();
+    @Nullable public abstract ValueProvider<String> getProjectId();
     @Nullable public abstract Query getQuery();
-    @Nullable public abstract String getNamespace();
+    @Nullable public abstract ValueProvider<String> getLiteralGqlQuery();
+    @Nullable public abstract ValueProvider<String> getNamespace();
     public abstract int getNumQuerySplits();
     @Nullable public abstract String getLocalhost();
 
@@ -247,9 +256,10 @@ public class DatastoreV1 {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setProjectId(String projectId);
+      abstract Builder setProjectId(ValueProvider<String> projectId);
       abstract Builder setQuery(Query query);
-      abstract Builder setNamespace(String namespace);
+      abstract Builder setLiteralGqlQuery(ValueProvider<String> 
literalGqlQuery);
+      abstract Builder setNamespace(ValueProvider<String> namespace);
       abstract Builder setNumQuerySplits(int numQuerySplits);
       abstract Builder setLocalhost(String localhost);
       abstract Read build();
@@ -282,7 +292,9 @@ public class DatastoreV1 {
     private static long queryLatestStatisticsTimestamp(Datastore datastore,
         @Nullable String namespace)  throws DatastoreException {
       Query.Builder query = Query.newBuilder();
-      if (namespace == null) {
+      // Note: namespace either being null or empty represents the default 
namespace, in which
+      // case we treat it as not provided by the user.
+      if (Strings.isNullOrEmpty(namespace)) {
         query.addKindBuilder().setName("__Stat_Total__");
       } else {
         query.addKindBuilder().setName("__Stat_Ns_Total__");
@@ -317,7 +329,7 @@ public class DatastoreV1 {
       LOG.info("Latest stats timestamp for kind {} is {}", ourKind, 
latestTimestamp);
 
       Query.Builder queryBuilder = Query.newBuilder();
-      if (namespace == null) {
+      if (Strings.isNullOrEmpty(namespace)) {
         queryBuilder.addKindBuilder().setName("__Stat_Kind__");
       } else {
         queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
@@ -342,13 +354,30 @@ public class DatastoreV1 {
       return entity.getProperties().get("entity_bytes").getIntegerValue();
     }
 
+    private static PartitionId.Builder forNamespace(@Nullable String 
namespace) {
+      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+      // Namespace either being null or empty represents the default namespace.
+      // Datastore Client libraries expect users to not set the namespace 
proto field in
+      // either of these cases.
+      if (!Strings.isNullOrEmpty(namespace)) {
+        partitionBuilder.setNamespaceId(namespace);
+      }
+      return partitionBuilder;
+    }
+
     /** Builds a {@link RunQueryRequest} from the {@code query} and {@code 
namespace}. */
     static RunQueryRequest makeRequest(Query query, @Nullable String 
namespace) {
-      RunQueryRequest.Builder requestBuilder = 
RunQueryRequest.newBuilder().setQuery(query);
-      if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-      }
-      return requestBuilder.build();
+      return RunQueryRequest.newBuilder()
+          .setQuery(query)
+          .setPartitionId(forNamespace(namespace)).build();
+    }
+
+    @VisibleForTesting
+    /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code 
namespace}. */
+    static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String 
namespace) {
+      return RunQueryRequest.newBuilder()
+          .setGqlQuery(gqlQuery)
+          .setPartitionId(forNamespace(namespace)).build();
     }
 
     /**
@@ -358,12 +387,52 @@ public class DatastoreV1 {
     private static List<Query> splitQuery(Query query, @Nullable String 
namespace,
         Datastore datastore, QuerySplitter querySplitter, int numSplits) 
throws DatastoreException {
       // If namespace is set, include it in the split request so splits are 
calculated accordingly.
-      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
-      if (namespace != null) {
-        partitionBuilder.setNamespaceId(namespace);
+      return querySplitter.getSplits(query, forNamespace(namespace).build(), 
numSplits, datastore);
+    }
+
+    /**
+     * Translates a Cloud Datastore gql query string to {@link Query}.
+     *
+     * <p>Currently, the only way to translate a gql query string to a Query 
is to run the query
+     * against Cloud Datastore and extract the {@code Query} from the 
response. To prevent reading
+     * any data, we set the {@code LIMIT} to 0 but if the gql query already 
has a limit set, we
+     * catch the exception with {@code INVALID_ARGUMENT} error code and retry 
the translation
+     * without the zero limit.
+     *
+     * <p>Note: This may result in reading actual data from Cloud Datastore 
but the service has a
+     * cap on the number of entities returned for a single rpc request, so 
this should not be a
+     * problem in practice.
+     */
+    @VisibleForTesting
+    static Query translateGqlQueryWithLimitCheck(String gql, Datastore 
datastore,
+        String namespace) throws DatastoreException {
+      String gqlQueryWithZeroLimit = gql + " LIMIT 0";
+      try {
+        Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, 
datastore, namespace);
+        // Clear the limit that we set.
+        return translatedQuery.toBuilder().clearLimit().build();
+      } catch (DatastoreException e) {
+        // Note: There is no specific error code or message to detect if the 
query already has a
+        // limit, so we just check for INVALID_ARGUMENT and assume that that 
the query might have
+        // a limit already set.
+        if (e.getCode() == Code.INVALID_ARGUMENT) {
+          LOG.warn("Failed to translate Gql query '{}': {}", 
gqlQueryWithZeroLimit, e.getMessage());
+          LOG.warn("User query might have a limit already set, so trying 
without zero limit");
+          // Retry without the zero limit.
+          return translateGqlQuery(gql, datastore, namespace);
+        } else {
+          throw e;
+        }
       }
+    }
 
-      return querySplitter.getSplits(query, partitionBuilder.build(), 
numSplits, datastore);
+    /** Translates a gql query string to {@link Query}.*/
+    private static Query translateGqlQuery(String gql, Datastore datastore, 
String namespace)
+        throws DatastoreException {
+      GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql)
+          .setAllowLiterals(true).build();
+      RunQueryRequest req = makeRequest(gqlQuery, namespace);
+      return datastore.runQuery(req).getQuery();
     }
 
     /**
@@ -372,6 +441,14 @@ public class DatastoreV1 {
      */
     public DatastoreV1.Read withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
+      return 
toBuilder().setProjectId(StaticValueProvider.of(projectId)).build();
+    }
+
+    /**
+     * Same as {@link Read#withProjectId(String)} but with a {@link 
ValueProvider}.
+     */
+    public DatastoreV1.Read withProjectId(ValueProvider<String> projectId) {
+      checkNotNull(projectId, "projectId");
       return toBuilder().setProjectId(projectId).build();
     }
 
@@ -391,9 +468,45 @@ public class DatastoreV1 {
     }
 
     /**
+     * Returns a new {@link DatastoreV1.Read} that reads the results of the 
specified GQL query.
+     * See <a 
href="https://cloud.google.com/datastore/docs/reference/gql_reference";>GQL 
Reference
+     * </a> to know more about GQL grammar.
+     *
+     * <p><b><i>Note:</i></b> This query is executed with literals allowed, so 
the users should
+     * ensure that the query is originated from trusted sources to avoid any 
security
+     * vulnerabilities via SQL Injection.
+     *
+     * <p><b><i>Experimental</i></b>: Cloud Datastore does not a provide a 
clean way to translate
+     * a gql query string to {@link Query}, so we end up making a query to the 
service for
+     * translation but this may read the actual data, although it will be a 
small amount.
+     * It needs more validation through production use cases before marking it 
as stable.
+     */
+    @Experimental(Kind.SOURCE_SINK)
+    public DatastoreV1.Read withLiteralGqlQuery(String gqlQuery) {
+      checkNotNull(gqlQuery, "gqlQuery");
+      return 
toBuilder().setLiteralGqlQuery(StaticValueProvider.of(gqlQuery)).build();
+    }
+
+    /**
+     * Same as {@link Read#withLiteralGqlQuery(String)} but with a {@link 
ValueProvider}.
+     */
+    @Experimental(Kind.SOURCE_SINK)
+    public DatastoreV1.Read withLiteralGqlQuery(ValueProvider<String> 
gqlQuery) {
+      checkNotNull(gqlQuery, "gqlQuery");
+      return toBuilder().setLiteralGqlQuery(gqlQuery).build();
+    }
+
+    /**
      * Returns a new {@link DatastoreV1.Read} that reads from the given 
namespace.
      */
     public DatastoreV1.Read withNamespace(String namespace) {
+      return 
toBuilder().setNamespace(StaticValueProvider.of(namespace)).build();
+    }
+
+    /**
+     * Same as {@link Read#withNamespace(String)} but with a {@link 
ValueProvider}.
+     */
+    public DatastoreV1.Read withNamespace(ValueProvider<String> namespace) {
       return toBuilder().setNamespace(namespace).build();
     }
 
@@ -431,31 +544,42 @@ public class DatastoreV1 {
 
     @Override
     public PCollection<Entity> expand(PBegin input) {
-      V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
-          getNamespace(), getLocalhost());
+      V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), 
getLocalhost());
 
       /*
        * This composite transform involves the following steps:
-       *   1. Create a singleton of the user provided {@code query} and apply 
a {@link ParDo} that
-       *   splits the query into {@code numQuerySplits} and assign each split 
query a unique
-       *   {@code Integer} as the key. The resulting output is of the type
-       *   {@code PCollection<KV<Integer, Query>>}.
+       *   1. Create a singleton of the user provided {@code query} or if 
{@code gqlQuery} is
+       *   provided apply a {@link ParDo} that translates the {@code gqlQuery} 
into a {@code query}.
+       *
+       *   2. A {@link ParDo} splits the resulting query into {@code 
numQuerySplits} and
+       *   assign each split query a unique {@code Integer} as the key. The 
resulting output is
+       *   of the type {@code PCollection<KV<Integer, Query>>}.
        *
        *   If the value of {@code numQuerySplits} is less than or equal to 0, 
then the number of
        *   splits will be computed dynamically based on the size of the data 
for the {@code query}.
        *
-       *   2. The resulting {@code PCollection} is sharded using a {@link 
GroupByKey} operation. The
+       *   3. The resulting {@code PCollection} is sharded using a {@link 
GroupByKey} operation. The
        *   queries are extracted from they {@code KV<Integer, 
Iterable<Query>>} and flattened to
        *   output a {@code PCollection<Query>}.
        *
-       *   3. In the third step, a {@code ParDo} reads entities for each query 
and outputs
+       *   4. In the third step, a {@code ParDo} reads entities for each query 
and outputs
        *   a {@code PCollection<Entity>}.
        */
-      PCollection<KV<Integer, Query>> queries = input
-          .apply(Create.of(getQuery()))
+
+      PCollection<Query> inputQuery;
+      if (getQuery() != null) {
+        inputQuery = input.apply(Create.of(getQuery()));
+      } else {
+        inputQuery = input
+            .apply(Create.of(getLiteralGqlQuery())
+                .withCoder(SerializableCoder.of(new 
TypeDescriptor<ValueProvider<String>>() {})))
+            .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+      }
+
+      PCollection<KV<Integer, Query>> splitQueries = inputQuery
           .apply(ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())));
 
-      PCollection<Query> shardedQueries = queries
+      PCollection<Query> shardedQueries = splitQueries
           .apply(GroupByKey.<Integer, Query>create())
           .apply(Values.<Iterable<Query>>create())
           .apply(Flatten.<Query>iterables());
@@ -469,36 +593,130 @@ public class DatastoreV1 {
     @Override
     public void validate(PBegin input) {
       checkNotNull(getProjectId(), "projectId");
-      checkNotNull(getQuery(), "query");
+
+      if (getProjectId().isAccessible() && getProjectId().get() == null) {
+        throw new IllegalArgumentException("Project id cannot be null");
+      }
+
+      if (getQuery() == null && getLiteralGqlQuery() == null) {
+        throw new IllegalArgumentException(
+            "Either query or gql query ValueProvider should be provided");
+      }
+
+      if (getQuery() != null && getLiteralGqlQuery() != null) {
+        throw new IllegalArgumentException(
+            "Only one of query or gql query ValueProvider should be provided");
+      }
+
+      if (getLiteralGqlQuery() != null && getLiteralGqlQuery().isAccessible()) 
{
+        checkNotNull(getLiteralGqlQuery().get(), "gqlQuery");
+      }
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
+      String query = getQuery() == null ? null : getQuery().toString();
       builder
           .addIfNotNull(DisplayData.item("projectId", getProjectId())
               .withLabel("ProjectId"))
           .addIfNotNull(DisplayData.item("namespace", getNamespace())
               .withLabel("Namespace"))
-          .addIfNotNull(DisplayData.item("query", getQuery().toString())
-              .withLabel("Query"));
+          .addIfNotNull(DisplayData.item("query", query)
+              .withLabel("Query"))
+          .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery())
+              .withLabel("GqlQuery"));
+    }
+
+    @VisibleForTesting
+    static class V1Options implements HasDisplayData, Serializable {
+      private final ValueProvider<String> project;
+      @Nullable
+      private final ValueProvider<String> namespace;
+      @Nullable
+      private final String localhost;
+
+      private V1Options(ValueProvider<String> project, ValueProvider<String> 
namespace,
+          String localhost) {
+        this.project = project;
+        this.namespace = namespace;
+        this.localhost = localhost;
+      }
+
+      public static V1Options from(String projectId, String namespace, String 
localhost) {
+        return from(StaticValueProvider.of(projectId), 
StaticValueProvider.of(namespace),
+            localhost);
+      }
+
+      public static V1Options from(ValueProvider<String> project, 
ValueProvider<String> namespace,
+          String localhost) {
+        return new V1Options(project, namespace, localhost);
+      }
+
+      public String getProjectId() {
+        return project.get();
+      }
+
+      @Nullable
+      public String getNamespace() {
+        return namespace == null ? null : namespace.get();
+      }
+
+      public ValueProvider<String> getProjectValueProvider() {
+        return project;
+      }
+
+      @Nullable
+      public ValueProvider<String> getNamespaceValueProvider() {
+        return namespace;
+      }
+
+      @Nullable
+      public String getLocalhost() {
+        return localhost;
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder
+            .addIfNotNull(DisplayData.item("projectId", 
getProjectValueProvider())
+                .withLabel("ProjectId"))
+            .addIfNotNull(DisplayData.item("namespace", 
getNamespaceValueProvider())
+                .withLabel("Namespace"));
+      }
     }
 
     /**
-     * A class for v1 Cloud Datastore related options.
+     * A DoFn that translates a Cloud Datastore gql query string to {@code 
Query}.
      */
-    @VisibleForTesting
-    @AutoValue
-    abstract static class V1Options implements Serializable {
-      public static V1Options from(String projectId, Query query, @Nullable 
String namespace,
-                                   @Nullable String localhost) {
-        return new AutoValue_DatastoreV1_Read_V1Options(projectId, query, 
namespace, localhost);
+    static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, 
Query> {
+      private final V1Options v1Options;
+      private transient Datastore datastore;
+      private final V1DatastoreFactory datastoreFactory;
+
+      GqlQueryTranslateFn(V1Options options) {
+        this(options, new V1DatastoreFactory());
       }
 
-      public abstract String getProjectId();
-      public abstract Query getQuery();
-      @Nullable public abstract String getNamespace();
-      @Nullable public abstract String getLocalhost();
+      GqlQueryTranslateFn(V1Options options, V1DatastoreFactory 
datastoreFactory) {
+        this.v1Options = options;
+        this.datastoreFactory = datastoreFactory;
+      }
+
+      @StartBundle
+      public void startBundle(Context c) throws Exception {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
v1Options.getProjectId());
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        ValueProvider<String> gqlQuery = c.element();
+        LOG.info("User query: '{}'", gqlQuery.get());
+        Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), 
datastore,
+            v1Options.getNamespace());
+        LOG.info("User gql query translated to Query({})", query);
+        c.output(query);
+      }
     }
 
     /**
@@ -574,13 +792,7 @@ public class DatastoreV1 {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        builder
-            .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
-                .withLabel("ProjectId"))
-            .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
-                .withLabel("Namespace"))
-            .addIfNotNull(DisplayData.item("query", 
options.getQuery().toString())
-                .withLabel("Query"));
+        builder.include("options", options);
       }
     }
 
@@ -660,6 +872,12 @@ public class DatastoreV1 {
                   || (currentBatch.getMoreResults() == NOT_FINISHED));
         }
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.include("options", options);
+      }
     }
   }
 
@@ -697,7 +915,7 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    Write(@Nullable String projectId, @Nullable String localhost) {
+    Write(@Nullable ValueProvider<String> projectId, @Nullable String 
localhost) {
       super(projectId, localhost, new UpsertFn());
     }
 
@@ -706,7 +924,15 @@ public class DatastoreV1 {
      */
     public Write withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new Write(projectId, null);
+      return withProjectId(StaticValueProvider.of(projectId));
+    }
+
+    /**
+     * Same as {@link Write#withProjectId(String)} but with a {@link 
ValueProvider}.
+     */
+    public Write withProjectId(ValueProvider<String> projectId) {
+      checkNotNull(projectId, "projectId ValueProvider");
+      return new Write(projectId, localhost);
     }
 
     /**
@@ -715,7 +941,7 @@ public class DatastoreV1 {
      */
     public Write withLocalhost(String localhost) {
       checkNotNull(localhost, "localhost");
-      return new Write(null, localhost);
+      return new Write(projectId, localhost);
     }
   }
 
@@ -729,7 +955,7 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    DeleteEntity(@Nullable String projectId, @Nullable String localhost) {
+    DeleteEntity(@Nullable ValueProvider<String> projectId, @Nullable String 
localhost) {
       super(projectId, localhost, new DeleteEntityFn());
     }
 
@@ -739,7 +965,15 @@ public class DatastoreV1 {
      */
     public DeleteEntity withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new DeleteEntity(projectId, null);
+      return withProjectId(StaticValueProvider.of(projectId));
+    }
+
+    /**
+     * Same as {@link DeleteEntity#withProjectId(String)} but with a {@link 
ValueProvider}.
+     */
+    public DeleteEntity withProjectId(ValueProvider<String> projectId) {
+      checkNotNull(projectId, "projectId ValueProvider");
+      return new DeleteEntity(projectId, localhost);
     }
 
     /**
@@ -748,7 +982,7 @@ public class DatastoreV1 {
      */
     public DeleteEntity withLocalhost(String localhost) {
       checkNotNull(localhost, "localhost");
-      return new DeleteEntity(null, localhost);
+      return new DeleteEntity(projectId, localhost);
     }
   }
 
@@ -763,7 +997,7 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    DeleteKey(@Nullable String projectId, @Nullable String localhost) {
+    DeleteKey(@Nullable ValueProvider<String> projectId, @Nullable String 
localhost) {
       super(projectId, localhost, new DeleteKeyFn());
     }
 
@@ -773,7 +1007,7 @@ public class DatastoreV1 {
      */
     public DeleteKey withProjectId(String projectId) {
       checkNotNull(projectId, "projectId");
-      return new DeleteKey(projectId, null);
+      return withProjectId(StaticValueProvider.of(projectId));
     }
 
     /**
@@ -782,7 +1016,15 @@ public class DatastoreV1 {
      */
     public DeleteKey withLocalhost(String localhost) {
       checkNotNull(localhost, "localhost");
-      return new DeleteKey(null, localhost);
+      return new DeleteKey(projectId, localhost);
+    }
+
+    /**
+     * Same as {@link DeleteKey#withProjectId(String)} but with a {@link 
ValueProvider}.
+     */
+    public DeleteKey withProjectId(ValueProvider<String> projectId) {
+      checkNotNull(projectId, "projectId ValueProvider");
+      return new DeleteKey(projectId, localhost);
     }
   }
 
@@ -795,10 +1037,9 @@ public class DatastoreV1 {
    * be used by the {@code DoFn} provided, as the commits are retried when 
failures occur.
    */
   private abstract static class Mutate<T> extends PTransform<PCollection<T>, 
PDone> {
+    protected ValueProvider<String> projectId;
     @Nullable
-    private final String projectId;
-    @Nullable
-    private final String localhost;
+    protected String localhost;
     /** A function that transforms each {@code T} into a mutation. */
     private final SimpleFunction<T, Mutation> mutationFn;
 
@@ -806,7 +1047,7 @@ public class DatastoreV1 {
      * Note that {@code projectId} is only {@code @Nullable} as a matter of 
build order, but if
      * it is {@code null} at instantiation time, an error will be thrown.
      */
-    Mutate(@Nullable String projectId, @Nullable String localhost,
+    Mutate(@Nullable ValueProvider<String> projectId, @Nullable String 
localhost,
         SimpleFunction<T, Mutation> mutationFn) {
       this.projectId = projectId;
       this.localhost = localhost;
@@ -824,7 +1065,10 @@ public class DatastoreV1 {
 
     @Override
     public void validate(PCollection<T> input) {
-      checkNotNull(projectId, "projectId");
+      checkNotNull(projectId, "projectId ValueProvider");
+      if (projectId.isAccessible()) {
+        checkNotNull(projectId.get(), "projectId");
+      }
       checkNotNull(mutationFn, "mutationFn");
     }
 
@@ -846,7 +1090,7 @@ public class DatastoreV1 {
     }
 
     public String getProjectId() {
-      return projectId;
+      return projectId.get();
     }
   }
 
@@ -867,7 +1111,7 @@ public class DatastoreV1 {
   @VisibleForTesting
   static class DatastoreWriterFn extends DoFn<Mutation, Void> {
     private static final Logger LOG = 
LoggerFactory.getLogger(DatastoreWriterFn.class);
-    private final String projectId;
+    private final ValueProvider<String> projectId;
     @Nullable
     private final String localhost;
     private transient Datastore datastore;
@@ -881,12 +1125,16 @@ public class DatastoreV1 {
             
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
     DatastoreWriterFn(String projectId, @Nullable String localhost) {
+      this(StaticValueProvider.of(projectId), localhost, new 
V1DatastoreFactory());
+    }
+
+    DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost) {
       this(projectId, localhost, new V1DatastoreFactory());
     }
 
     @VisibleForTesting
-    DatastoreWriterFn(String projectId, @Nullable String localhost,
-                      V1DatastoreFactory datastoreFactory) {
+    DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost,
+        V1DatastoreFactory datastoreFactory) {
       this.projectId = checkNotNull(projectId, "projectId");
       this.localhost = localhost;
       this.datastoreFactory = datastoreFactory;
@@ -894,7 +1142,7 @@ public class DatastoreV1 {
 
     @StartBundle
     public void startBundle(Context c) {
-      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
projectId, localhost);
+      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
projectId.get(), localhost);
     }
 
     @ProcessElement
@@ -1048,6 +1296,14 @@ public class DatastoreV1 {
   static class V1DatastoreFactory implements Serializable {
 
     /** Builds a Cloud Datastore client for the given pipeline options and 
project. */
+    public Datastore getDatastore(PipelineOptions pipelineOptions, String 
projectId) {
+      return getDatastore(pipelineOptions, projectId, null);
+    }
+
+    /**
+     * Builds a Cloud Datastore client for the given pipeline options, project 
and an optional
+     * locahost.
+     */
     public Datastore getDatastore(PipelineOptions pipelineOptions, String 
projectId,
         @Nullable String localhost) {
       Credentials credential = 
pipelineOptions.as(GcpOptions.class).getGcpCredential();

http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index c2bc8d2..879e30e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -31,6 +31,7 @@ import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUND
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest;
+import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.translateGqlQueryWithLimitCheck;
 import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -51,6 +52,7 @@ import static org.mockito.Mockito.when;
 import com.google.datastore.v1.CommitRequest;
 import com.google.datastore.v1.Entity;
 import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.GqlQuery;
 import com.google.datastore.v1.Key;
 import com.google.datastore.v1.Mutation;
 import com.google.datastore.v1.PartitionId;
@@ -59,14 +61,17 @@ import com.google.datastore.v1.QueryResultBatch;
 import com.google.datastore.v1.RunQueryRequest;
 import com.google.datastore.v1.RunQueryResponse;
 import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
+import com.google.rpc.Code;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn;
@@ -79,7 +84,11 @@ import 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -88,7 +97,6 @@ import 
org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -111,13 +119,16 @@ public class DatastoreV1Test {
   private static final String KIND = "testKind";
   private static final Query QUERY;
   private static final String LOCALHOST = "localhost:9955";
+  private static final String GQL_QUERY = "SELECT * from " + KIND;
   private static final V1Options V_1_OPTIONS;
+
   static {
     Query.Builder q = Query.newBuilder();
     q.addKindBuilder().setName(KIND);
     QUERY = q.build();
-    V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE, null);
+    V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null);
   }
+
   private DatastoreV1.Read initialRead;
 
   @Mock
@@ -149,8 +160,17 @@ public class DatastoreV1Test {
     DatastoreV1.Read read = DatastoreIO.v1().read()
         .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
     assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
+    assertEquals(PROJECT_ID, read.getProjectId().get());
+    assertEquals(NAMESPACE, read.getNamespace().get());
+  }
+
+  @Test
+  public void testBuildReadWithGqlQuery() throws Exception {
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        
.withProjectId(PROJECT_ID).withLiteralGqlQuery(GQL_QUERY).withNamespace(NAMESPACE);
+    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
+    assertEquals(PROJECT_ID, read.getProjectId().get());
+    assertEquals(NAMESPACE, read.getNamespace().get());
   }
 
   /**
@@ -158,28 +178,41 @@ public class DatastoreV1Test {
    */
   @Test
   public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =  DatastoreIO.v1().read()
-        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY)
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withQuery(QUERY).withNamespace(NAMESPACE).withProjectId(PROJECT_ID)
         .withLocalhost(LOCALHOST);
     assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
+    assertEquals(PROJECT_ID, read.getProjectId().get());
+    assertEquals(NAMESPACE, read.getNamespace().get());
     assertEquals(LOCALHOST, read.getLocalhost());
   }
 
   @Test
   public void testReadValidationFailsProject() throws Exception {
-    DatastoreV1.Read read =  DatastoreIO.v1().read().withQuery(QUERY);
+    DatastoreV1.Read read = DatastoreIO.v1().read().withQuery(QUERY);
     thrown.expect(NullPointerException.class);
-    thrown.expectMessage("project");
+    thrown.expectMessage("projectId");
     read.validate(null);
   }
 
   @Test
   public void testReadValidationFailsQuery() throws Exception {
-    DatastoreV1.Read read =  DatastoreIO.v1().read().withProjectId(PROJECT_ID);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("query");
+    DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Either query or gql query ValueProvider should be 
provided");
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID)
+        .withLiteralGqlQuery(GQL_QUERY)
+        .withQuery(QUERY);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Only one of query or gql query ValueProvider should be provided");
     read.validate(null);
   }
 
@@ -203,14 +236,14 @@ public class DatastoreV1Test {
 
   @Test
   public void testReadValidationSucceedsNamespace() throws Exception {
-    DatastoreV1.Read read =  
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY);
+    DatastoreV1.Read read = 
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY);
     /* Should succeed, as a null namespace is fine. */
     read.validate(null);
   }
 
   @Test
   public void testReadDisplayData() {
-    DatastoreV1.Read read =  DatastoreIO.v1().read()
+    DatastoreV1.Read read = DatastoreIO.v1().read()
         .withProjectId(PROJECT_ID)
         .withQuery(QUERY)
         .withNamespace(NAMESPACE);
@@ -223,10 +256,23 @@ public class DatastoreV1Test {
   }
 
   @Test
-  @Category(RunnableOnService.class)
+  public void testReadDisplayDataWithGqlQuery() {
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID)
+        .withLiteralGqlQuery(GQL_QUERY)
+        .withNamespace(NAMESPACE);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
+    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+  }
+
+  @Test
   public void testSourcePrimitiveDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PBegin, ? extends POutput> read = 
DatastoreIO.v1().read().withProjectId(
+    PTransform<PBegin, PCollection<Entity>> read = 
DatastoreIO.v1().read().withProjectId(
         "myProject").withQuery(Query.newBuilder().build());
 
     Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -235,32 +281,55 @@ public class DatastoreV1Test {
   }
 
   @Test
+  public void testSourcePrimitiveDisplayDataWithGqlQuery() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PBegin, PCollection<Entity>> read = 
DatastoreIO.v1().read().withProjectId(
+        "myProject").withLiteralGqlQuery(GQL_QUERY);
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveSourceTransforms(read);
+    assertThat("DatastoreIO read should include the project in its primitive 
display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
   public void testWriteDoesNotAllowNullProject() throws Exception {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
+    DatastoreIO.v1().write().withProjectId((String) null);
+  }
 
-    DatastoreIO.v1().write().withProjectId(null);
+  @Test
+  public void testWriteDoesNotAllowNullProjectValueProvider() throws Exception 
{
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    DatastoreIO.v1().write().withProjectId((ValueProvider<String>) null);
   }
 
   @Test
   public void testWriteValidationFailsWithNoProject() throws Exception {
-    Write write =  DatastoreIO.v1().write();
+    Write write = DatastoreIO.v1().write();
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    write.validate(null);
+  }
 
+  @Test
+  public void testWriteValidationFailsWithNoProjectInStaticValueProvider() 
throws Exception {
+    Write write = 
DatastoreIO.v1().write().withProjectId(StaticValueProvider.<String>of(null));
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
-
     write.validate(null);
   }
 
   @Test
   public void testWriteValidationSucceedsWithProject() throws Exception {
-    Write write =  DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
     write.validate(null);
   }
 
   @Test
   public void testWriteDisplayData() {
-    Write write =  DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -271,17 +340,30 @@ public class DatastoreV1Test {
   public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
+    DatastoreIO.v1().deleteEntity().withProjectId((String) null);
+  }
 
-    DatastoreIO.v1().deleteEntity().withProjectId(null);
+  @Test
+  public void testDeleteEntityDoesNotAllowNullProjectValueProvider() throws 
Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    DatastoreIO.v1().deleteEntity().withProjectId((ValueProvider<String>) 
null);
   }
 
   @Test
   public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
     DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity();
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    deleteEntity.validate(null);
+  }
 
+  @Test
+  public void 
testDeleteEntityValidationFailsWithNoProjectInStaticValueProvider() throws 
Exception {
+    DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity()
+        .withProjectId(StaticValueProvider.<String>of(null));
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
-
     deleteEntity.validate(null);
   }
 
@@ -293,7 +375,7 @@ public class DatastoreV1Test {
 
   @Test
   public void testDeleteEntityDisplayData() {
-    DeleteEntity deleteEntity =  
DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+    DeleteEntity deleteEntity = 
DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
 
     DisplayData displayData = DisplayData.from(deleteEntity);
 
@@ -304,17 +386,30 @@ public class DatastoreV1Test {
   public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
+    DatastoreIO.v1().deleteKey().withProjectId((String) null);
+  }
 
-    DatastoreIO.v1().deleteKey().withProjectId(null);
+  @Test
+  public void testDeleteKeyDoesNotAllowNullProjectValueProvider() throws 
Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    DatastoreIO.v1().deleteKey().withProjectId((ValueProvider<String>) null);
   }
 
   @Test
   public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
     DeleteKey deleteKey = DatastoreIO.v1().deleteKey();
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId ValueProvider");
+    deleteKey.validate(null);
+  }
 
+  @Test
+  public void testDeleteKeyValidationFailsWithNoProjectInStaticValueProvider() 
throws Exception {
+    DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(
+        StaticValueProvider.<String>of(null));
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("projectId");
-
     deleteKey.validate(null);
   }
 
@@ -326,7 +421,7 @@ public class DatastoreV1Test {
 
   @Test
   public void testDeleteKeyDisplayData() {
-    DeleteKey deleteKey =  
DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+    DeleteKey deleteKey = 
DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
 
     DisplayData displayData = DisplayData.from(deleteKey);
 
@@ -345,7 +440,6 @@ public class DatastoreV1Test {
         displayData, hasItem(hasDisplayItem("projectId")));
     assertThat("DatastoreIO write should include the upsertFn in its primitive 
display data",
         displayData, hasItem(hasDisplayItem("upsertFn")));
-
   }
 
   @Test
@@ -360,7 +454,6 @@ public class DatastoreV1Test {
         displayData, hasItem(hasDisplayItem("projectId")));
     assertThat("DatastoreIO write should include the deleteEntityFn in its 
primitive display data",
         displayData, hasItem(hasDisplayItem("deleteEntityFn")));
-
   }
 
   @Test
@@ -375,7 +468,6 @@ public class DatastoreV1Test {
         displayData, hasItem(hasDisplayItem("projectId")));
     assertThat("DatastoreIO write should include the deleteKeyFn in its 
primitive display data",
         displayData, hasItem(hasDisplayItem("deleteKeyFn")));
-
   }
 
   /**
@@ -383,7 +475,7 @@ public class DatastoreV1Test {
    */
   @Test
   public void testBuildWrite() throws Exception {
-    DatastoreV1.Write write =  
DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    DatastoreV1.Write write = 
DatastoreIO.v1().write().withProjectId(PROJECT_ID);
     assertEquals(PROJECT_ID, write.getProjectId());
   }
 
@@ -543,8 +635,8 @@ public class DatastoreV1Test {
           makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 
1)).build()).build());
     }
 
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null,
-        mockDatastoreFactory);
+    DatastoreWriterFn datastoreWriter = new 
DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
+        null, mockDatastoreFactory);
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
@@ -695,12 +787,92 @@ public class DatastoreV1Test {
     readFnTest(5 * QUERY_BATCH_LIMIT);
   }
 
+  @Test
+  public void testTranslateGqlQueryWithLimit() throws Exception {
+    String gql = "SELECT * from DummyKind LIMIT 10";
+    String gqlWithZeroLimit = gql + " LIMIT 0";
+    GqlQuery gqlQuery = 
GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
+    GqlQuery gqlQueryWithZeroLimit = 
GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit)
+        .setAllowLiterals(true).build();
+    RunQueryRequest gqlRequest = makeRequest(gqlQuery, 
V_1_OPTIONS.getNamespace());
+    RunQueryRequest gqlRequestWithZeroLimit = 
makeRequest(gqlQueryWithZeroLimit,
+        V_1_OPTIONS.getNamespace());
+    when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+        .thenThrow(new DatastoreException("runQuery", Code.INVALID_ARGUMENT, 
"invalid query",
+            // dummy
+            new RuntimeException()));
+    when(mockDatastore.runQuery(gqlRequest))
+        .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+    assertEquals(translateGqlQueryWithLimitCheck(gql, mockDatastore, 
V_1_OPTIONS.getNamespace()),
+        QUERY);
+    verify(mockDatastore, times(1)).runQuery(gqlRequest);
+    verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+  }
+
+  @Test
+  public void testTranslateGqlQueryWithNoLimit() throws Exception {
+    String gql = "SELECT * from DummyKind";
+    String gqlWithZeroLimit = gql + " LIMIT 0";
+    GqlQuery gqlQueryWithZeroLimit = 
GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit)
+        .setAllowLiterals(true).build();
+    RunQueryRequest gqlRequestWithZeroLimit = 
makeRequest(gqlQueryWithZeroLimit,
+        V_1_OPTIONS.getNamespace());
+    when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+        .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+    assertEquals(translateGqlQueryWithLimitCheck(gql, mockDatastore, 
V_1_OPTIONS.getNamespace()),
+        QUERY);
+    verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+  }
+
+  /** Test options. **/
+  public interface RuntimeTestOptions extends PipelineOptions {
+    ValueProvider<String> getDatastoreProject();
+    void setDatastoreProject(ValueProvider<String> value);
+
+    ValueProvider<String> getGqlQuery();
+    void setGqlQuery(ValueProvider<String> value);
+
+    ValueProvider<String> getNamespace();
+    void setNamespace(ValueProvider<String> value);
+  }
+
+  /**
+   * Test to ensure that {@link ValueProvider} values are not accessed at 
pipeline construction time
+   * when built with {@link DatastoreV1.Read#withQuery(Query)}.
+   */
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyQuery() {
+    RuntimeTestOptions options = 
PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    Pipeline pipeline = TestPipeline.create(options);
+    pipeline
+        .apply(DatastoreIO.v1().read()
+            .withProjectId(options.getDatastoreProject())
+            .withQuery(QUERY)
+            .withNamespace(options.getNamespace()))
+        
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+  }
+
+  /**
+   * Test to ensure that {@link ValueProvider} values are not accessed at 
pipeline construction time
+   * when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
+   */
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
+    RuntimeTestOptions options = 
PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    Pipeline pipeline = TestPipeline.create(options);
+    pipeline
+        .apply(DatastoreIO.v1().read()
+            .withProjectId(options.getDatastoreProject())
+            .withLiteralGqlQuery(options.getGqlQuery()))
+        
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+  }
+
   /** Helper Methods */
 
   /** A helper function that verifies if all the queries have unique keys. */
   private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
     Set<Integer> keys = new HashSet<>();
-    for (KV<Integer, Query> kv: queries) {
+    for (KV<Integer, Query> kv : queries) {
       keys.add(kv.getKey());
     }
     assertEquals(keys.size(), queries.size());
@@ -830,7 +1002,6 @@ public class DatastoreV1Test {
     return timestampQuery.build();
   }
 
-
   /** Generate dummy query splits. */
   private List<Query> splitQuery(Query query, int numSplits) {
     List<Query> queries = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
index 49a60c6..5b1066a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -86,7 +86,7 @@ public class SplitQueryFnIT {
     query.addKindBuilder().setName(kind);
 
     SplitQueryFn splitQueryFn = new SplitQueryFn(
-        V1Options.from(projectId, query.build(), namespace, null), 0);
+        V1Options.from(projectId, namespace, null), 0);
     DoFnTester<Query, KV<Integer, Query>> doFnTester = 
DoFnTester.of(splitQueryFn);
 
     List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build());

http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index 6fe2568..bbfedbe 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -54,25 +54,29 @@ public class V1ReadIT {
   private final long numEntities = 1000;
 
   @Before
-  public void setup() {
+  public void setup() throws Exception {
     PipelineOptionsFactory.register(V1TestOptions.class);
     options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
     project = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
     ancestor = UUID.randomUUID().toString();
+    // Create entities and write them to datastore
+    writeEntitiesToDatastore(options, project, ancestor, numEntities);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteAllEntities(options, project, ancestor);
   }
 
   /**
-   * An end-to-end test for {@link DatastoreV1.Read}.
+   * An end-to-end test for {@link DatastoreV1.Read#withQuery(Query)}
    *
-   * <p>Write some test entities to datastore and then run a dataflow pipeline 
that
+   * <p>Write some test entities to datastore and then run a pipeline that
    * reads and counts the total number of entities. Verify that the count 
matches the
    * number of entities written.
    */
   @Test
   public void testE2EV1Read() throws Exception {
-    // Create entities and write them to datastore
-    writeEntitiesToDatastore(options, project, ancestor, numEntities);
-
     // Read from datastore
     Query query = V1TestUtil.makeAncestorKindQuery(
         options.getKind(), options.getNamespace(), ancestor);
@@ -92,9 +96,52 @@ public class V1ReadIT {
     p.run();
   }
 
+  @Test
+  public void testE2EV1ReadWithGQLQueryWithNoLimit() throws Exception {
+    testE2EV1ReadWithGQLQuery(0);
+  }
+
+  @Test
+  public void testE2EV1ReadWithGQLQueryWithLimit() throws Exception {
+    testE2EV1ReadWithGQLQuery(99);
+  }
+
+  /**
+   * An end-to-end test for {@link 
DatastoreV1.Read#withLiteralGqlQuery(String)}.
+   *
+   * <p>Write some test entities to datastore and then run a pipeline that
+   * reads and counts the total number of entities. Verify that the count 
matches
+   * the number of entities written.
+   */
+  private void testE2EV1ReadWithGQLQuery(long limit) throws Exception {
+    String gqlQuery = String.format(
+        "SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')",
+        options.getKind(), options.getKind(), ancestor);
+
+    long expectedNumEntities = numEntities;
+    if (limit > 0) {
+      gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit);
+      expectedNumEntities = limit;
+    }
+
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(project)
+        .withLiteralGqlQuery(gqlQuery)
+        .withNamespace(options.getNamespace());
+
+    // Count the total number of entities
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count = p
+        .apply(read)
+        .apply(Count.<Entity>globally());
+
+    PAssert.thatSingleton(count).isEqualTo(expectedNumEntities);
+    p.run();
+  }
+
   // Creates entities and write them to datastore
   private static void writeEntitiesToDatastore(V1TestOptions options, String 
project,
-                                               String ancestor, long 
numEntities) throws Exception {
+      String ancestor, long numEntities) throws Exception {
     Datastore datastore = getDatastore(options, project);
     // Write test entities to datastore
     V1TestWriter writer = new V1TestWriter(datastore, new 
UpsertMutationBuilder());
@@ -106,9 +153,4 @@ public class V1ReadIT {
     }
     writer.close();
   }
-
-  @After
-  public void tearDown() throws Exception {
-    deleteAllEntities(options, project, ancestor);
-  }
 }

Reply via email to