Repository: beam Updated Branches: refs/heads/master 6e808b2d6 -> 04c6ef4c7
Add ValueProvider options for DatastoreIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5dd84426 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5dd84426 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5dd84426 Branch: refs/heads/master Commit: 5dd8442620ab472f6f1e97455057229b94cea28e Parents: 6e808b2 Author: Vikas Kedigehalli <[email protected]> Authored: Wed Nov 16 00:05:41 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Wed Mar 15 10:45:24 2017 -0700 ---------------------------------------------------------------------- pom.xml | 7 + sdks/java/io/google-cloud-platform/pom.xml | 26 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 390 +++++++++++++++---- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 243 ++++++++++-- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1ReadIT.java | 66 +++- 6 files changed, 617 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7183264..ca04ca0 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,7 @@ <google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version> <guava.version>20.0</guava.version> <grpc.version>1.0.1</grpc.version> + <grpc-google-common-protos.version>0.1.0</grpc-google-common-protos.version> <hamcrest.version>1.3</hamcrest.version> <jackson.version>2.7.2</jackson.version> <findbugs.version>3.0.1</findbugs.version> @@ -818,6 +819,12 @@ </dependency> <dependency> + <groupId>com.google.api.grpc</groupId> + <artifactId>grpc-google-common-protos</artifactId> + <version>${grpc-google-common-protos.version}</version> + </dependency> + + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 0caf882..bd7f0d2 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -70,6 +70,26 @@ </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <goals><goal>analyze-only</goal></goals> + <configuration> + <usedDependencies> + <!-- + BEAM-1632: Conflicting classes are brought in by bigtable-protos resulting in + maven-dependency:analyze `unused dependency` error. Marking this as a + `usedDependency` until the issue is resolved. + --> + <usedDependency>com.google.api.grpc:grpc-google-common-protos</usedDependency> + </usedDependencies> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> @@ -170,6 +190,11 @@ </dependency> <dependency> + <groupId>com.google.api.grpc</groupId> + <artifactId>grpc-google-common-protos</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -252,5 +277,4 @@ <scope>test</scope> </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 85c0744..d07fd50 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -41,10 +41,12 @@ import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.Entity; import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.GqlQuery; import com.google.datastore.v1.Key; import com.google.datastore.v1.Key.PathElement; import com.google.datastore.v1.Mutation; @@ -60,6 +62,7 @@ import com.google.datastore.v1.client.DatastoreHelper; import com.google.datastore.v1.client.DatastoreOptions; import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; +import com.google.rpc.Code; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -67,8 +70,12 @@ import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -80,12 +87,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -185,7 +194,6 @@ import org.slf4j.LoggerFactory; * * @see org.apache.beam.sdk.runners.PipelineRunner */ -@Experimental(Experimental.Kind.SOURCE_SINK) public class DatastoreV1 { // A package-private constructor to prevent direct instantiation from outside of this package @@ -234,9 +242,10 @@ public class DatastoreV1 { */ static final int QUERY_BATCH_LIMIT = 500; - @Nullable public abstract String getProjectId(); + @Nullable public abstract ValueProvider<String> getProjectId(); @Nullable public abstract Query getQuery(); - @Nullable public abstract String getNamespace(); + @Nullable public abstract ValueProvider<String> getLiteralGqlQuery(); + @Nullable public abstract ValueProvider<String> getNamespace(); public abstract int getNumQuerySplits(); @Nullable public abstract String getLocalhost(); @@ -247,9 +256,10 @@ public class DatastoreV1 { @AutoValue.Builder abstract static class Builder { - abstract Builder setProjectId(String projectId); + abstract Builder setProjectId(ValueProvider<String> projectId); abstract Builder setQuery(Query query); - abstract Builder setNamespace(String namespace); + abstract Builder setLiteralGqlQuery(ValueProvider<String> literalGqlQuery); + abstract Builder setNamespace(ValueProvider<String> namespace); abstract Builder setNumQuerySplits(int numQuerySplits); abstract Builder setLocalhost(String localhost); abstract Read build(); @@ -282,7 +292,9 @@ public class DatastoreV1 { private static long queryLatestStatisticsTimestamp(Datastore datastore, @Nullable String namespace) throws DatastoreException { Query.Builder query = Query.newBuilder(); - if (namespace == null) { + // Note: namespace either being null or empty represents the default namespace, in which + // case we treat it as not provided by the user. + if (Strings.isNullOrEmpty(namespace)) { query.addKindBuilder().setName("__Stat_Total__"); } else { query.addKindBuilder().setName("__Stat_Ns_Total__"); @@ -317,7 +329,7 @@ public class DatastoreV1 { LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp); Query.Builder queryBuilder = Query.newBuilder(); - if (namespace == null) { + if (Strings.isNullOrEmpty(namespace)) { queryBuilder.addKindBuilder().setName("__Stat_Kind__"); } else { queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__"); @@ -342,13 +354,30 @@ public class DatastoreV1 { return entity.getProperties().get("entity_bytes").getIntegerValue(); } + private static PartitionId.Builder forNamespace(@Nullable String namespace) { + PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); + // Namespace either being null or empty represents the default namespace. + // Datastore Client libraries expect users to not set the namespace proto field in + // either of these cases. + if (!Strings.isNullOrEmpty(namespace)) { + partitionBuilder.setNamespaceId(namespace); + } + return partitionBuilder; + } + /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */ static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { - RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); - if (namespace != null) { - requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); - } - return requestBuilder.build(); + return RunQueryRequest.newBuilder() + .setQuery(query) + .setPartitionId(forNamespace(namespace)).build(); + } + + @VisibleForTesting + /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */ + static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) { + return RunQueryRequest.newBuilder() + .setGqlQuery(gqlQuery) + .setPartitionId(forNamespace(namespace)).build(); } /** @@ -358,12 +387,52 @@ public class DatastoreV1 { private static List<Query> splitQuery(Query query, @Nullable String namespace, Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException { // If namespace is set, include it in the split request so splits are calculated accordingly. - PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); - if (namespace != null) { - partitionBuilder.setNamespaceId(namespace); + return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore); + } + + /** + * Translates a Cloud Datastore gql query string to {@link Query}. + * + * <p>Currently, the only way to translate a gql query string to a Query is to run the query + * against Cloud Datastore and extract the {@code Query} from the response. To prevent reading + * any data, we set the {@code LIMIT} to 0 but if the gql query already has a limit set, we + * catch the exception with {@code INVALID_ARGUMENT} error code and retry the translation + * without the zero limit. + * + * <p>Note: This may result in reading actual data from Cloud Datastore but the service has a + * cap on the number of entities returned for a single rpc request, so this should not be a + * problem in practice. + */ + @VisibleForTesting + static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, + String namespace) throws DatastoreException { + String gqlQueryWithZeroLimit = gql + " LIMIT 0"; + try { + Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace); + // Clear the limit that we set. + return translatedQuery.toBuilder().clearLimit().build(); + } catch (DatastoreException e) { + // Note: There is no specific error code or message to detect if the query already has a + // limit, so we just check for INVALID_ARGUMENT and assume that that the query might have + // a limit already set. + if (e.getCode() == Code.INVALID_ARGUMENT) { + LOG.warn("Failed to translate Gql query '{}': {}", gqlQueryWithZeroLimit, e.getMessage()); + LOG.warn("User query might have a limit already set, so trying without zero limit"); + // Retry without the zero limit. + return translateGqlQuery(gql, datastore, namespace); + } else { + throw e; + } } + } - return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore); + /** Translates a gql query string to {@link Query}.*/ + private static Query translateGqlQuery(String gql, Datastore datastore, String namespace) + throws DatastoreException { + GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql) + .setAllowLiterals(true).build(); + RunQueryRequest req = makeRequest(gqlQuery, namespace); + return datastore.runQuery(req).getQuery(); } /** @@ -372,6 +441,14 @@ public class DatastoreV1 { */ public DatastoreV1.Read withProjectId(String projectId) { checkNotNull(projectId, "projectId"); + return toBuilder().setProjectId(StaticValueProvider.of(projectId)).build(); + } + + /** + * Same as {@link Read#withProjectId(String)} but with a {@link ValueProvider}. + */ + public DatastoreV1.Read withProjectId(ValueProvider<String> projectId) { + checkNotNull(projectId, "projectId"); return toBuilder().setProjectId(projectId).build(); } @@ -391,9 +468,45 @@ public class DatastoreV1 { } /** + * Returns a new {@link DatastoreV1.Read} that reads the results of the specified GQL query. + * See <a href="https://cloud.google.com/datastore/docs/reference/gql_reference">GQL Reference + * </a> to know more about GQL grammar. + * + * <p><b><i>Note:</i></b> This query is executed with literals allowed, so the users should + * ensure that the query is originated from trusted sources to avoid any security + * vulnerabilities via SQL Injection. + * + * <p><b><i>Experimental</i></b>: Cloud Datastore does not a provide a clean way to translate + * a gql query string to {@link Query}, so we end up making a query to the service for + * translation but this may read the actual data, although it will be a small amount. + * It needs more validation through production use cases before marking it as stable. + */ + @Experimental(Kind.SOURCE_SINK) + public DatastoreV1.Read withLiteralGqlQuery(String gqlQuery) { + checkNotNull(gqlQuery, "gqlQuery"); + return toBuilder().setLiteralGqlQuery(StaticValueProvider.of(gqlQuery)).build(); + } + + /** + * Same as {@link Read#withLiteralGqlQuery(String)} but with a {@link ValueProvider}. + */ + @Experimental(Kind.SOURCE_SINK) + public DatastoreV1.Read withLiteralGqlQuery(ValueProvider<String> gqlQuery) { + checkNotNull(gqlQuery, "gqlQuery"); + return toBuilder().setLiteralGqlQuery(gqlQuery).build(); + } + + /** * Returns a new {@link DatastoreV1.Read} that reads from the given namespace. */ public DatastoreV1.Read withNamespace(String namespace) { + return toBuilder().setNamespace(StaticValueProvider.of(namespace)).build(); + } + + /** + * Same as {@link Read#withNamespace(String)} but with a {@link ValueProvider}. + */ + public DatastoreV1.Read withNamespace(ValueProvider<String> namespace) { return toBuilder().setNamespace(namespace).build(); } @@ -431,31 +544,42 @@ public class DatastoreV1 { @Override public PCollection<Entity> expand(PBegin input) { - V1Options v1Options = V1Options.from(getProjectId(), getQuery(), - getNamespace(), getLocalhost()); + V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), getLocalhost()); /* * This composite transform involves the following steps: - * 1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that - * splits the query into {@code numQuerySplits} and assign each split query a unique - * {@code Integer} as the key. The resulting output is of the type - * {@code PCollection<KV<Integer, Query>>}. + * 1. Create a singleton of the user provided {@code query} or if {@code gqlQuery} is + * provided apply a {@link ParDo} that translates the {@code gqlQuery} into a {@code query}. + * + * 2. A {@link ParDo} splits the resulting query into {@code numQuerySplits} and + * assign each split query a unique {@code Integer} as the key. The resulting output is + * of the type {@code PCollection<KV<Integer, Query>>}. * * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of * splits will be computed dynamically based on the size of the data for the {@code query}. * - * 2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The + * 3. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The * queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to * output a {@code PCollection<Query>}. * - * 3. In the third step, a {@code ParDo} reads entities for each query and outputs + * 4. In the third step, a {@code ParDo} reads entities for each query and outputs * a {@code PCollection<Entity>}. */ - PCollection<KV<Integer, Query>> queries = input - .apply(Create.of(getQuery())) + + PCollection<Query> inputQuery; + if (getQuery() != null) { + inputQuery = input.apply(Create.of(getQuery())); + } else { + inputQuery = input + .apply(Create.of(getLiteralGqlQuery()) + .withCoder(SerializableCoder.of(new TypeDescriptor<ValueProvider<String>>() {}))) + .apply(ParDo.of(new GqlQueryTranslateFn(v1Options))); + } + + PCollection<KV<Integer, Query>> splitQueries = inputQuery .apply(ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits()))); - PCollection<Query> shardedQueries = queries + PCollection<Query> shardedQueries = splitQueries .apply(GroupByKey.<Integer, Query>create()) .apply(Values.<Iterable<Query>>create()) .apply(Flatten.<Query>iterables()); @@ -469,36 +593,130 @@ public class DatastoreV1 { @Override public void validate(PBegin input) { checkNotNull(getProjectId(), "projectId"); - checkNotNull(getQuery(), "query"); + + if (getProjectId().isAccessible() && getProjectId().get() == null) { + throw new IllegalArgumentException("Project id cannot be null"); + } + + if (getQuery() == null && getLiteralGqlQuery() == null) { + throw new IllegalArgumentException( + "Either query or gql query ValueProvider should be provided"); + } + + if (getQuery() != null && getLiteralGqlQuery() != null) { + throw new IllegalArgumentException( + "Only one of query or gql query ValueProvider should be provided"); + } + + if (getLiteralGqlQuery() != null && getLiteralGqlQuery().isAccessible()) { + checkNotNull(getLiteralGqlQuery().get(), "gqlQuery"); + } } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + String query = getQuery() == null ? null : getQuery().toString(); builder .addIfNotNull(DisplayData.item("projectId", getProjectId()) .withLabel("ProjectId")) .addIfNotNull(DisplayData.item("namespace", getNamespace()) .withLabel("Namespace")) - .addIfNotNull(DisplayData.item("query", getQuery().toString()) - .withLabel("Query")); + .addIfNotNull(DisplayData.item("query", query) + .withLabel("Query")) + .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()) + .withLabel("GqlQuery")); + } + + @VisibleForTesting + static class V1Options implements HasDisplayData, Serializable { + private final ValueProvider<String> project; + @Nullable + private final ValueProvider<String> namespace; + @Nullable + private final String localhost; + + private V1Options(ValueProvider<String> project, ValueProvider<String> namespace, + String localhost) { + this.project = project; + this.namespace = namespace; + this.localhost = localhost; + } + + public static V1Options from(String projectId, String namespace, String localhost) { + return from(StaticValueProvider.of(projectId), StaticValueProvider.of(namespace), + localhost); + } + + public static V1Options from(ValueProvider<String> project, ValueProvider<String> namespace, + String localhost) { + return new V1Options(project, namespace, localhost); + } + + public String getProjectId() { + return project.get(); + } + + @Nullable + public String getNamespace() { + return namespace == null ? null : namespace.get(); + } + + public ValueProvider<String> getProjectValueProvider() { + return project; + } + + @Nullable + public ValueProvider<String> getNamespaceValueProvider() { + return namespace; + } + + @Nullable + public String getLocalhost() { + return localhost; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotNull(DisplayData.item("projectId", getProjectValueProvider()) + .withLabel("ProjectId")) + .addIfNotNull(DisplayData.item("namespace", getNamespaceValueProvider()) + .withLabel("Namespace")); + } } /** - * A class for v1 Cloud Datastore related options. + * A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */ - @VisibleForTesting - @AutoValue - abstract static class V1Options implements Serializable { - public static V1Options from(String projectId, Query query, @Nullable String namespace, - @Nullable String localhost) { - return new AutoValue_DatastoreV1_Read_V1Options(projectId, query, namespace, localhost); + static class GqlQueryTranslateFn extends DoFn<ValueProvider<String>, Query> { + private final V1Options v1Options; + private transient Datastore datastore; + private final V1DatastoreFactory datastoreFactory; + + GqlQueryTranslateFn(V1Options options) { + this(options, new V1DatastoreFactory()); } - public abstract String getProjectId(); - public abstract Query getQuery(); - @Nullable public abstract String getNamespace(); - @Nullable public abstract String getLocalhost(); + GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) { + this.v1Options = options; + this.datastoreFactory = datastoreFactory; + } + + @StartBundle + public void startBundle(Context c) throws Exception { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), v1Options.getProjectId()); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + ValueProvider<String> gqlQuery = c.element(); + LOG.info("User query: '{}'", gqlQuery.get()); + Query query = translateGqlQueryWithLimitCheck(gqlQuery.get(), datastore, + v1Options.getNamespace()); + LOG.info("User gql query translated to Query({})", query); + c.output(query); + } } /** @@ -574,13 +792,7 @@ public class DatastoreV1 { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", options.getProjectId()) - .withLabel("ProjectId")) - .addIfNotNull(DisplayData.item("namespace", options.getNamespace()) - .withLabel("Namespace")) - .addIfNotNull(DisplayData.item("query", options.getQuery().toString()) - .withLabel("Query")); + builder.include("options", options); } } @@ -660,6 +872,12 @@ public class DatastoreV1 { || (currentBatch.getMoreResults() == NOT_FINISHED)); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.include("options", options); + } } } @@ -697,7 +915,7 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Write(@Nullable String projectId, @Nullable String localhost) { + Write(@Nullable ValueProvider<String> projectId, @Nullable String localhost) { super(projectId, localhost, new UpsertFn()); } @@ -706,7 +924,15 @@ public class DatastoreV1 { */ public Write withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new Write(projectId, null); + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Same as {@link Write#withProjectId(String)} but with a {@link ValueProvider}. + */ + public Write withProjectId(ValueProvider<String> projectId) { + checkNotNull(projectId, "projectId ValueProvider"); + return new Write(projectId, localhost); } /** @@ -715,7 +941,7 @@ public class DatastoreV1 { */ public Write withLocalhost(String localhost) { checkNotNull(localhost, "localhost"); - return new Write(null, localhost); + return new Write(projectId, localhost); } } @@ -729,7 +955,7 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteEntity(@Nullable String projectId, @Nullable String localhost) { + DeleteEntity(@Nullable ValueProvider<String> projectId, @Nullable String localhost) { super(projectId, localhost, new DeleteEntityFn()); } @@ -739,7 +965,15 @@ public class DatastoreV1 { */ public DeleteEntity withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteEntity(projectId, null); + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Same as {@link DeleteEntity#withProjectId(String)} but with a {@link ValueProvider}. + */ + public DeleteEntity withProjectId(ValueProvider<String> projectId) { + checkNotNull(projectId, "projectId ValueProvider"); + return new DeleteEntity(projectId, localhost); } /** @@ -748,7 +982,7 @@ public class DatastoreV1 { */ public DeleteEntity withLocalhost(String localhost) { checkNotNull(localhost, "localhost"); - return new DeleteEntity(null, localhost); + return new DeleteEntity(projectId, localhost); } } @@ -763,7 +997,7 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteKey(@Nullable String projectId, @Nullable String localhost) { + DeleteKey(@Nullable ValueProvider<String> projectId, @Nullable String localhost) { super(projectId, localhost, new DeleteKeyFn()); } @@ -773,7 +1007,7 @@ public class DatastoreV1 { */ public DeleteKey withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteKey(projectId, null); + return withProjectId(StaticValueProvider.of(projectId)); } /** @@ -782,7 +1016,15 @@ public class DatastoreV1 { */ public DeleteKey withLocalhost(String localhost) { checkNotNull(localhost, "localhost"); - return new DeleteKey(null, localhost); + return new DeleteKey(projectId, localhost); + } + + /** + * Same as {@link DeleteKey#withProjectId(String)} but with a {@link ValueProvider}. + */ + public DeleteKey withProjectId(ValueProvider<String> projectId) { + checkNotNull(projectId, "projectId ValueProvider"); + return new DeleteKey(projectId, localhost); } } @@ -795,10 +1037,9 @@ public class DatastoreV1 { * be used by the {@code DoFn} provided, as the commits are retried when failures occur. */ private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> { + protected ValueProvider<String> projectId; @Nullable - private final String projectId; - @Nullable - private final String localhost; + protected String localhost; /** A function that transforms each {@code T} into a mutation. */ private final SimpleFunction<T, Mutation> mutationFn; @@ -806,7 +1047,7 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Mutate(@Nullable String projectId, @Nullable String localhost, + Mutate(@Nullable ValueProvider<String> projectId, @Nullable String localhost, SimpleFunction<T, Mutation> mutationFn) { this.projectId = projectId; this.localhost = localhost; @@ -824,7 +1065,10 @@ public class DatastoreV1 { @Override public void validate(PCollection<T> input) { - checkNotNull(projectId, "projectId"); + checkNotNull(projectId, "projectId ValueProvider"); + if (projectId.isAccessible()) { + checkNotNull(projectId.get(), "projectId"); + } checkNotNull(mutationFn, "mutationFn"); } @@ -846,7 +1090,7 @@ public class DatastoreV1 { } public String getProjectId() { - return projectId; + return projectId.get(); } } @@ -867,7 +1111,7 @@ public class DatastoreV1 { @VisibleForTesting static class DatastoreWriterFn extends DoFn<Mutation, Void> { private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); - private final String projectId; + private final ValueProvider<String> projectId; @Nullable private final String localhost; private transient Datastore datastore; @@ -881,12 +1125,16 @@ public class DatastoreV1 { .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); DatastoreWriterFn(String projectId, @Nullable String localhost) { + this(StaticValueProvider.of(projectId), localhost, new V1DatastoreFactory()); + } + + DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost) { this(projectId, localhost, new V1DatastoreFactory()); } @VisibleForTesting - DatastoreWriterFn(String projectId, @Nullable String localhost, - V1DatastoreFactory datastoreFactory) { + DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost, + V1DatastoreFactory datastoreFactory) { this.projectId = checkNotNull(projectId, "projectId"); this.localhost = localhost; this.datastoreFactory = datastoreFactory; @@ -894,7 +1142,7 @@ public class DatastoreV1 { @StartBundle public void startBundle(Context c) { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId, localhost); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(), localhost); } @ProcessElement @@ -1048,6 +1296,14 @@ public class DatastoreV1 { static class V1DatastoreFactory implements Serializable { /** Builds a Cloud Datastore client for the given pipeline options and project. */ + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + return getDatastore(pipelineOptions, projectId, null); + } + + /** + * Builds a Cloud Datastore client for the given pipeline options, project and an optional + * locahost. + */ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, @Nullable String localhost) { Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index c2bc8d2..879e30e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -31,6 +31,7 @@ import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUND import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.translateGqlQueryWithLimitCheck; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -51,6 +52,7 @@ import static org.mockito.Mockito.when; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.Entity; import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.GqlQuery; import com.google.datastore.v1.Key; import com.google.datastore.v1.Mutation; import com.google.datastore.v1.PartitionId; @@ -59,14 +61,17 @@ import com.google.datastore.v1.QueryResultBatch; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.client.Datastore; +import com.google.datastore.v1.client.DatastoreException; import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; +import com.google.rpc.Code; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn; @@ -79,7 +84,11 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory; import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; import org.apache.beam.sdk.transforms.PTransform; @@ -88,7 +97,6 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -111,13 +119,16 @@ public class DatastoreV1Test { private static final String KIND = "testKind"; private static final Query QUERY; private static final String LOCALHOST = "localhost:9955"; + private static final String GQL_QUERY = "SELECT * from " + KIND; private static final V1Options V_1_OPTIONS; + static { Query.Builder q = Query.newBuilder(); q.addKindBuilder().setName(KIND); QUERY = q.build(); - V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE, null); + V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null); } + private DatastoreV1.Read initialRead; @Mock @@ -149,8 +160,17 @@ public class DatastoreV1Test { DatastoreV1.Read read = DatastoreIO.v1().read() .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId()); - assertEquals(NAMESPACE, read.getNamespace()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); + } + + @Test + public void testBuildReadWithGqlQuery() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID).withLiteralGqlQuery(GQL_QUERY).withNamespace(NAMESPACE); + assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); } /** @@ -158,28 +178,41 @@ public class DatastoreV1Test { */ @Test public void testBuildReadAlt() throws Exception { - DatastoreV1.Read read = DatastoreIO.v1().read() - .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY) + DatastoreV1.Read read = DatastoreIO.v1().read() + .withQuery(QUERY).withNamespace(NAMESPACE).withProjectId(PROJECT_ID) .withLocalhost(LOCALHOST); assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId()); - assertEquals(NAMESPACE, read.getNamespace()); + assertEquals(PROJECT_ID, read.getProjectId().get()); + assertEquals(NAMESPACE, read.getNamespace().get()); assertEquals(LOCALHOST, read.getLocalhost()); } @Test public void testReadValidationFailsProject() throws Exception { - DatastoreV1.Read read = DatastoreIO.v1().read().withQuery(QUERY); + DatastoreV1.Read read = DatastoreIO.v1().read().withQuery(QUERY); thrown.expect(NullPointerException.class); - thrown.expectMessage("project"); + thrown.expectMessage("projectId"); read.validate(null); } @Test public void testReadValidationFailsQuery() throws Exception { - DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID); - thrown.expect(NullPointerException.class); - thrown.expectMessage("query"); + DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Either query or gql query ValueProvider should be provided"); + read.validate(null); + } + + @Test + public void testReadValidationFailsQueryAndGqlQuery() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID) + .withLiteralGqlQuery(GQL_QUERY) + .withQuery(QUERY); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Only one of query or gql query ValueProvider should be provided"); read.validate(null); } @@ -203,14 +236,14 @@ public class DatastoreV1Test { @Test public void testReadValidationSucceedsNamespace() throws Exception { - DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY); + DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY); /* Should succeed, as a null namespace is fine. */ read.validate(null); } @Test public void testReadDisplayData() { - DatastoreV1.Read read = DatastoreIO.v1().read() + DatastoreV1.Read read = DatastoreIO.v1().read() .withProjectId(PROJECT_ID) .withQuery(QUERY) .withNamespace(NAMESPACE); @@ -223,10 +256,23 @@ public class DatastoreV1Test { } @Test - @Category(RunnableOnService.class) + public void testReadDisplayDataWithGqlQuery() { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID) + .withLiteralGqlQuery(GQL_QUERY) + .withNamespace(NAMESPACE); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY)); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + } + + @Test public void testSourcePrimitiveDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1().read().withProjectId( + PTransform<PBegin, PCollection<Entity>> read = DatastoreIO.v1().read().withProjectId( "myProject").withQuery(Query.newBuilder().build()); Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); @@ -235,32 +281,55 @@ public class DatastoreV1Test { } @Test + public void testSourcePrimitiveDisplayDataWithGqlQuery() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PBegin, PCollection<Entity>> read = DatastoreIO.v1().read().withProjectId( + "myProject").withLiteralGqlQuery(GQL_QUERY); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("DatastoreIO read should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + + @Test public void testWriteDoesNotAllowNullProject() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); + DatastoreIO.v1().write().withProjectId((String) null); + } - DatastoreIO.v1().write().withProjectId(null); + @Test + public void testWriteDoesNotAllowNullProjectValueProvider() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + DatastoreIO.v1().write().withProjectId((ValueProvider<String>) null); } @Test public void testWriteValidationFailsWithNoProject() throws Exception { - Write write = DatastoreIO.v1().write(); + Write write = DatastoreIO.v1().write(); + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + write.validate(null); + } + @Test + public void testWriteValidationFailsWithNoProjectInStaticValueProvider() throws Exception { + Write write = DatastoreIO.v1().write().withProjectId(StaticValueProvider.<String>of(null)); thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); - write.validate(null); } @Test public void testWriteValidationSucceedsWithProject() throws Exception { - Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); write.validate(null); } @Test public void testWriteDisplayData() { - Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); DisplayData displayData = DisplayData.from(write); @@ -271,17 +340,30 @@ public class DatastoreV1Test { public void testDeleteEntityDoesNotAllowNullProject() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); + DatastoreIO.v1().deleteEntity().withProjectId((String) null); + } - DatastoreIO.v1().deleteEntity().withProjectId(null); + @Test + public void testDeleteEntityDoesNotAllowNullProjectValueProvider() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + DatastoreIO.v1().deleteEntity().withProjectId((ValueProvider<String>) null); } @Test public void testDeleteEntityValidationFailsWithNoProject() throws Exception { DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity(); + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + deleteEntity.validate(null); + } + @Test + public void testDeleteEntityValidationFailsWithNoProjectInStaticValueProvider() throws Exception { + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity() + .withProjectId(StaticValueProvider.<String>of(null)); thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); - deleteEntity.validate(null); } @@ -293,7 +375,7 @@ public class DatastoreV1Test { @Test public void testDeleteEntityDisplayData() { - DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); DisplayData displayData = DisplayData.from(deleteEntity); @@ -304,17 +386,30 @@ public class DatastoreV1Test { public void testDeleteKeyDoesNotAllowNullProject() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); + DatastoreIO.v1().deleteKey().withProjectId((String) null); + } - DatastoreIO.v1().deleteKey().withProjectId(null); + @Test + public void testDeleteKeyDoesNotAllowNullProjectValueProvider() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + DatastoreIO.v1().deleteKey().withProjectId((ValueProvider<String>) null); } @Test public void testDeleteKeyValidationFailsWithNoProject() throws Exception { DeleteKey deleteKey = DatastoreIO.v1().deleteKey(); + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId ValueProvider"); + deleteKey.validate(null); + } + @Test + public void testDeleteKeyValidationFailsWithNoProjectInStaticValueProvider() throws Exception { + DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId( + StaticValueProvider.<String>of(null)); thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); - deleteKey.validate(null); } @@ -326,7 +421,7 @@ public class DatastoreV1Test { @Test public void testDeleteKeyDisplayData() { - DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); + DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); DisplayData displayData = DisplayData.from(deleteKey); @@ -345,7 +440,6 @@ public class DatastoreV1Test { displayData, hasItem(hasDisplayItem("projectId"))); assertThat("DatastoreIO write should include the upsertFn in its primitive display data", displayData, hasItem(hasDisplayItem("upsertFn"))); - } @Test @@ -360,7 +454,6 @@ public class DatastoreV1Test { displayData, hasItem(hasDisplayItem("projectId"))); assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data", displayData, hasItem(hasDisplayItem("deleteEntityFn"))); - } @Test @@ -375,7 +468,6 @@ public class DatastoreV1Test { displayData, hasItem(hasDisplayItem("projectId"))); assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data", displayData, hasItem(hasDisplayItem("deleteKeyFn"))); - } /** @@ -383,7 +475,7 @@ public class DatastoreV1Test { */ @Test public void testBuildWrite() throws Exception { - DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); assertEquals(PROJECT_ID, write.getProjectId()); } @@ -543,8 +635,8 @@ public class DatastoreV1Test { makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null, - mockDatastoreFactory); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), + null, mockDatastoreFactory); DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); @@ -695,12 +787,92 @@ public class DatastoreV1Test { readFnTest(5 * QUERY_BATCH_LIMIT); } + @Test + public void testTranslateGqlQueryWithLimit() throws Exception { + String gql = "SELECT * from DummyKind LIMIT 10"; + String gqlWithZeroLimit = gql + " LIMIT 0"; + GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build(); + GqlQuery gqlQueryWithZeroLimit = GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit) + .setAllowLiterals(true).build(); + RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace()); + RunQueryRequest gqlRequestWithZeroLimit = makeRequest(gqlQueryWithZeroLimit, + V_1_OPTIONS.getNamespace()); + when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) + .thenThrow(new DatastoreException("runQuery", Code.INVALID_ARGUMENT, "invalid query", + // dummy + new RuntimeException())); + when(mockDatastore.runQuery(gqlRequest)) + .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); + assertEquals(translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), + QUERY); + verify(mockDatastore, times(1)).runQuery(gqlRequest); + verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); + } + + @Test + public void testTranslateGqlQueryWithNoLimit() throws Exception { + String gql = "SELECT * from DummyKind"; + String gqlWithZeroLimit = gql + " LIMIT 0"; + GqlQuery gqlQueryWithZeroLimit = GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit) + .setAllowLiterals(true).build(); + RunQueryRequest gqlRequestWithZeroLimit = makeRequest(gqlQueryWithZeroLimit, + V_1_OPTIONS.getNamespace()); + when(mockDatastore.runQuery(gqlRequestWithZeroLimit)) + .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build()); + assertEquals(translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), + QUERY); + verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit); + } + + /** Test options. **/ + public interface RuntimeTestOptions extends PipelineOptions { + ValueProvider<String> getDatastoreProject(); + void setDatastoreProject(ValueProvider<String> value); + + ValueProvider<String> getGqlQuery(); + void setGqlQuery(ValueProvider<String> value); + + ValueProvider<String> getNamespace(); + void setNamespace(ValueProvider<String> value); + } + + /** + * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time + * when built with {@link DatastoreV1.Read#withQuery(Query)}. + */ + @Test + public void testRuntimeOptionsNotCalledInApplyQuery() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply(DatastoreIO.v1().read() + .withProjectId(options.getDatastoreProject()) + .withQuery(QUERY) + .withNamespace(options.getNamespace())) + .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); + } + + /** + * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time + * when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}. + */ + @Test + public void testRuntimeOptionsNotCalledInApplyGqlQuery() { + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); + Pipeline pipeline = TestPipeline.create(options); + pipeline + .apply(DatastoreIO.v1().read() + .withProjectId(options.getDatastoreProject()) + .withLiteralGqlQuery(options.getGqlQuery())) + .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); + } + /** Helper Methods */ /** A helper function that verifies if all the queries have unique keys. */ private void verifyUniqueKeys(List<KV<Integer, Query>> queries) { Set<Integer> keys = new HashSet<>(); - for (KV<Integer, Query> kv: queries) { + for (KV<Integer, Query> kv : queries) { keys.add(kv.getKey()); } assertEquals(keys.size(), queries.size()); @@ -830,7 +1002,6 @@ public class DatastoreV1Test { return timestampQuery.build(); } - /** Generate dummy query splits. */ private List<Query> splitQuery(Query query, int numSplits) { List<Query> queries = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java index 49a60c6..5b1066a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -86,7 +86,7 @@ public class SplitQueryFnIT { query.addKindBuilder().setName(kind); SplitQueryFn splitQueryFn = new SplitQueryFn( - V1Options.from(projectId, query.build(), namespace, null), 0); + V1Options.from(projectId, namespace, null), 0); DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build()); http://git-wip-us.apache.org/repos/asf/beam/blob/5dd84426/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java index 6fe2568..bbfedbe 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java @@ -54,25 +54,29 @@ public class V1ReadIT { private final long numEntities = 1000; @Before - public void setup() { + public void setup() throws Exception { PipelineOptionsFactory.register(V1TestOptions.class); options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class); project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); ancestor = UUID.randomUUID().toString(); + // Create entities and write them to datastore + writeEntitiesToDatastore(options, project, ancestor, numEntities); + } + + @After + public void tearDown() throws Exception { + deleteAllEntities(options, project, ancestor); } /** - * An end-to-end test for {@link DatastoreV1.Read}. + * An end-to-end test for {@link DatastoreV1.Read#withQuery(Query)} * - * <p>Write some test entities to datastore and then run a dataflow pipeline that + * <p>Write some test entities to datastore and then run a pipeline that * reads and counts the total number of entities. Verify that the count matches the * number of entities written. */ @Test public void testE2EV1Read() throws Exception { - // Create entities and write them to datastore - writeEntitiesToDatastore(options, project, ancestor, numEntities); - // Read from datastore Query query = V1TestUtil.makeAncestorKindQuery( options.getKind(), options.getNamespace(), ancestor); @@ -92,9 +96,52 @@ public class V1ReadIT { p.run(); } + @Test + public void testE2EV1ReadWithGQLQueryWithNoLimit() throws Exception { + testE2EV1ReadWithGQLQuery(0); + } + + @Test + public void testE2EV1ReadWithGQLQueryWithLimit() throws Exception { + testE2EV1ReadWithGQLQuery(99); + } + + /** + * An end-to-end test for {@link DatastoreV1.Read#withLiteralGqlQuery(String)}. + * + * <p>Write some test entities to datastore and then run a pipeline that + * reads and counts the total number of entities. Verify that the count matches + * the number of entities written. + */ + private void testE2EV1ReadWithGQLQuery(long limit) throws Exception { + String gqlQuery = String.format( + "SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')", + options.getKind(), options.getKind(), ancestor); + + long expectedNumEntities = numEntities; + if (limit > 0) { + gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit); + expectedNumEntities = limit; + } + + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(project) + .withLiteralGqlQuery(gqlQuery) + .withNamespace(options.getNamespace()); + + // Count the total number of entities + Pipeline p = Pipeline.create(options); + PCollection<Long> count = p + .apply(read) + .apply(Count.<Entity>globally()); + + PAssert.thatSingleton(count).isEqualTo(expectedNumEntities); + p.run(); + } + // Creates entities and write them to datastore private static void writeEntitiesToDatastore(V1TestOptions options, String project, - String ancestor, long numEntities) throws Exception { + String ancestor, long numEntities) throws Exception { Datastore datastore = getDatastore(options, project); // Write test entities to datastore V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder()); @@ -106,9 +153,4 @@ public class V1ReadIT { } writer.close(); } - - @After - public void tearDown() throws Exception { - deleteAllEntities(options, project, ancestor); - } }
