http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java deleted file mode 100644 index 8503b66..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ /dev/null @@ -1,1033 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Verify.verify; -import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; -import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; -import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.RetryHttpRequestInitializer; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; -import com.google.datastore.v1beta3.CommitRequest; -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.EntityResult; -import com.google.datastore.v1beta3.Key; -import com.google.datastore.v1beta3.Key.PathElement; -import com.google.datastore.v1beta3.Mutation; -import com.google.datastore.v1beta3.PartitionId; -import com.google.datastore.v1beta3.Query; -import com.google.datastore.v1beta3.QueryResultBatch; -import com.google.datastore.v1beta3.RunQueryRequest; -import com.google.datastore.v1beta3.RunQueryResponse; -import com.google.datastore.v1beta3.client.Datastore; -import com.google.datastore.v1beta3.client.DatastoreException; -import com.google.datastore.v1beta3.client.DatastoreFactory; -import com.google.datastore.v1beta3.client.DatastoreHelper; -import com.google.datastore.v1beta3.client.DatastoreOptions; -import com.google.datastore.v1beta3.client.QuerySplitter; -import com.google.protobuf.Int32Value; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; - -/** - * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of - * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3 - * {@link Entity} objects. - * - * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users - * must use the {@code gcloud} command line tool to get credentials for Datastore: - * <pre> - * $ gcloud auth login - * </pre> - * - * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and - * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to - * specify the project to query and the query to read from. You can optionally provide a namespace - * to query within using {@link V1Beta3.Read#withNamespace}. You could also optionally specify - * how many splits you want for the query using {@link V1Beta3.Read#withNumQuerySplits}. - * - * <p>For example: - * - * <pre> {@code - * // Read a query from Datastore - * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - * Query query = ...; - * String projectId = "..."; - * - * Pipeline p = Pipeline.create(options); - * PCollection<Entity> entities = p.apply( - * DatastoreIO.v1beta3().read() - * .withProjectId(projectId) - * .withQuery(query)); - * } </pre> - * - * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across - * many workers. However, when the {@link Query} is configured with a limit using - * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then - * all returned results will be read by a single Dataflow worker in order to ensure correct data. - * - * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write}, - * specifying the Cloud Datastore project to write to: - * - * <pre> {@code - * PCollection<Entity> entities = ...; - * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId)); - * p.run(); - * } </pre> - * - * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use - * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to: - * - * <pre> {@code - * PCollection<Entity> entities = ...; - * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId)); - * p.run(); - * } </pre> - * - * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore, - * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to: - * - * <pre> {@code - * PCollection<Entity> entities = ...; - * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId)); - * p.run(); - * } </pre> - * - * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete - * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the - * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than - * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}. - * - * <pre>{@code - * Key.Builder keyBuilder = DatastoreHelper.makeKey(...); - * keyBuilder.getPartitionIdBuilder().setNamespace(namespace); - * }</pre> - * - * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please - * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, - * and Keys</a> for more information about {@code Entity} keys. - * - * <p><h3>Permissions</h3> - * Permission requirements depend on the {@code PipelineRunner} that is used to execute the - * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for - * more details. - * - * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up - * </a>for security and permission related information specific to Datastore. - * - * @see org.apache.beam.sdk.runners.PipelineRunner - */ -@Experimental(Experimental.Kind.SOURCE_SINK) -public class V1Beta3 { - - // A package-private constructor to prevent direct instantiation from outside of this package - V1Beta3() {} - - /** - * Datastore has a limit of 500 mutations per batch operation, so we flush - * changes to Datastore every 500 entities. - */ - @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; - - /** - * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId}, - * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using - * {@link V1Beta3.Read#withProjectId}, {@link V1Beta3.Read#withQuery}, - * {@link V1Beta3.Read#withNamespace}, {@link V1Beta3.Read#withNumQuerySplits}. - */ - public V1Beta3.Read read() { - return new V1Beta3.Read(null, null, null, 0); - } - - /** - * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity} - * objects. - * - * @see DatastoreIO - */ - public static class Read extends PTransform<PBegin, PCollection<Entity>> { - private static final Logger LOG = LoggerFactory.getLogger(Read.class); - - /** An upper bound on the number of splits for a query. */ - public static final int NUM_QUERY_SPLITS_MAX = 50000; - - /** A lower bound on the number of splits for a query. */ - static final int NUM_QUERY_SPLITS_MIN = 12; - - /** Default bundle size of 64MB. */ - static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024; - - /** - * Maximum number of results to request per query. - * - * <p>Must be set, or it may result in an I/O error when querying Cloud Datastore. - */ - static final int QUERY_BATCH_LIMIT = 500; - - @Nullable - private final String projectId; - - @Nullable - private final Query query; - - @Nullable - private final String namespace; - - private final int numQuerySplits; - - /** - * Computes the number of splits to be performed on the given query by querying the estimated - * size from Datastore. - */ - static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) { - int numSplits; - try { - long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace); - numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX, - Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES)); - } catch (Exception e) { - LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e); - // Fallback in case estimated size is unavailable. - numSplits = NUM_QUERY_SPLITS_MIN; - } - return Math.max(numSplits, NUM_QUERY_SPLITS_MIN); - } - - /** - * Get the estimated size of the data returned by the given query. - * - * <p>Datastore provides no way to get a good estimate of how large the result of a query - * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind - * is specified in the query. - * - * <p>See https://cloud.google.com/datastore/docs/concepts/stats. - */ - static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) - throws DatastoreException { - String ourKind = query.getKind(0).getName(); - Query.Builder queryBuilder = Query.newBuilder(); - if (namespace == null) { - queryBuilder.addKindBuilder().setName("__Stat_Kind__"); - } else { - queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__"); - } - queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build())); - - // Get the latest statistics - queryBuilder.addOrder(makeOrder("timestamp", DESCENDING)); - queryBuilder.setLimit(Int32Value.newBuilder().setValue(1)); - - RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); - - long now = System.currentTimeMillis(); - RunQueryResponse response = datastore.runQuery(request); - LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now); - - QueryResultBatch batch = response.getBatch(); - if (batch.getEntityResultsCount() == 0) { - throw new NoSuchElementException( - "Datastore statistics for kind " + ourKind + " unavailable"); - } - Entity entity = batch.getEntityResults(0).getEntity(); - return entity.getProperties().get("entity_bytes").getIntegerValue(); - } - - /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */ - static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { - RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); - if (namespace != null) { - requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); - } - return requestBuilder.build(); - } - - /** - * A helper function to get the split queries, taking into account the optional - * {@code namespace}. - */ - private static List<Query> splitQuery(Query query, @Nullable String namespace, - Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException { - // If namespace is set, include it in the split request so splits are calculated accordingly. - PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); - if (namespace != null) { - partitionBuilder.setNamespaceId(namespace); - } - - return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore); - } - - /** - * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be - * {@code null} as a matter of build order, but if they are {@code null} at instantiation time, - * an error will be thrown. - */ - private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace, - int numQuerySplits) { - this.projectId = projectId; - this.query = query; - this.namespace = namespace; - this.numQuerySplits = numQuerySplits; - } - - /** - * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project. - */ - public V1Beta3.Read withProjectId(String projectId) { - checkNotNull(projectId, "projectId"); - return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); - } - - /** - * Returns a new {@link V1Beta3.Read} that reads the results of the specified query. - * - * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel - * across many workers. However, when the {@link Query} is configured with a limit using - * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order - * to ensure correct results. - */ - public V1Beta3.Read withQuery(Query query) { - checkNotNull(query, "query"); - checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0, - "Invalid query limit %s: must be positive", query.getLimit().getValue()); - return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); - } - - /** - * Returns a new {@link V1Beta3.Read} that reads from the given namespace. - */ - public V1Beta3.Read withNamespace(String namespace) { - return new V1Beta3.Read(projectId, query, namespace, numQuerySplits); - } - - /** - * Returns a new {@link V1Beta3.Read} that reads by splitting the given {@code query} into - * {@code numQuerySplits}. - * - * <p>The semantics for the query splitting is defined below: - * <ul> - * <li>Any value less than or equal to 0 will be ignored, and the number of splits will be - * chosen dynamically at runtime based on the query data size. - * <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at - * {@code NUM_QUERY_SPLITS_MAX}. - * <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be - * ignored and no split will be performed. - * <li>Under certain cases Cloud Datastore is unable to split query to the requested number of - * splits. In such cases we just use whatever the Datastore returns. - * </ul> - */ - public V1Beta3.Read withNumQuerySplits(int numQuerySplits) { - return new V1Beta3.Read(projectId, query, namespace, - Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX)); - } - - @Nullable - public Query getQuery() { - return query; - } - - @Nullable - public String getProjectId() { - return projectId; - } - - @Nullable - public String getNamespace() { - return namespace; - } - - - /** - * {@inheritDoc} - */ - @Override - public PCollection<Entity> apply(PBegin input) { - V1Beta3Options v1Beta3Options = V1Beta3Options.from(getProjectId(), getQuery(), - getNamespace()); - - /* - * This composite transform involves the following steps: - * 1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that - * splits the query into {@code numQuerySplits} and assign each split query a unique - * {@code Integer} as the key. The resulting output is of the type - * {@code PCollection<KV<Integer, Query>>}. - * - * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of - * splits will be computed dynamically based on the size of the data for the {@code query}. - * - * 2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The - * queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to - * output a {@code PCollection<Query>}. - * - * 3. In the third step, a {@code ParDo} reads entities for each query and outputs - * a {@code PCollection<Entity>}. - */ - PCollection<KV<Integer, Query>> queries = input - .apply(Create.of(query)) - .apply(ParDo.of(new SplitQueryFn(v1Beta3Options, numQuerySplits))); - - PCollection<Query> shardedQueries = queries - .apply(GroupByKey.<Integer, Query>create()) - .apply(Values.<Iterable<Query>>create()) - .apply(Flatten.<Query>iterables()); - - PCollection<Entity> entities = shardedQueries - .apply(ParDo.of(new ReadFn(v1Beta3Options))); - - return entities; - } - - @Override - public void validate(PBegin input) { - checkNotNull(projectId, "projectId"); - checkNotNull(query, "query"); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("ProjectId")) - .addIfNotNull(DisplayData.item("namespace", namespace) - .withLabel("Namespace")) - .addIfNotNull(DisplayData.item("query", query.toString()) - .withLabel("Query")); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("projectId", projectId) - .add("query", query) - .add("namespace", namespace) - .toString(); - } - - /** - * A class for v1beta3 Datastore related options. - */ - @VisibleForTesting - static class V1Beta3Options implements Serializable { - private final Query query; - private final String projectId; - @Nullable - private final String namespace; - - private V1Beta3Options(String projectId, Query query, @Nullable String namespace) { - this.projectId = checkNotNull(projectId, "projectId"); - this.query = checkNotNull(query, "query"); - this.namespace = namespace; - } - - public static V1Beta3Options from(String projectId, Query query, @Nullable String namespace) { - return new V1Beta3Options(projectId, query, namespace); - } - - public Query getQuery() { - return query; - } - - public String getProjectId() { - return projectId; - } - - @Nullable - public String getNamespace() { - return namespace; - } - } - - /** - * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique - * keys and outputs them as {@link KV}. - */ - @VisibleForTesting - static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> { - private final V1Beta3Options options; - // number of splits to make for a given query - private final int numSplits; - - private final V1Beta3DatastoreFactory datastoreFactory; - // Datastore client - private transient Datastore datastore; - // Query splitter - private transient QuerySplitter querySplitter; - - public SplitQueryFn(V1Beta3Options options, int numSplits) { - this(options, numSplits, new V1Beta3DatastoreFactory()); - } - - @VisibleForTesting - SplitQueryFn(V1Beta3Options options, int numSplits, - V1Beta3DatastoreFactory datastoreFactory) { - this.options = options; - this.numSplits = numSplits; - this.datastoreFactory = datastoreFactory; - } - - @StartBundle - public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId); - querySplitter = datastoreFactory.getQuerySplitter(); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - int key = 1; - Query query = c.element(); - - // If query has a user set limit, then do not split. - if (query.hasLimit()) { - c.output(KV.of(key, query)); - return; - } - - int estimatedNumSplits; - // Compute the estimated numSplits if numSplits is not specified by the user. - if (numSplits <= 0) { - estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace()); - } else { - estimatedNumSplits = numSplits; - } - - List<Query> querySplits; - try { - querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter, - estimatedNumSplits); - } catch (Exception e) { - LOG.warn("Unable to parallelize the given query: {}", query, e); - querySplits = ImmutableList.of(query); - } - - // assign unique keys to query splits. - for (Query subquery : querySplits) { - c.output(KV.of(key++, subquery)); - } - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", options.getProjectId()) - .withLabel("ProjectId")) - .addIfNotNull(DisplayData.item("namespace", options.getNamespace()) - .withLabel("Namespace")) - .addIfNotNull(DisplayData.item("query", options.getQuery().toString()) - .withLabel("Query")); - } - } - - /** - * A {@link DoFn} that reads entities from Datastore for each query. - */ - @VisibleForTesting - static class ReadFn extends DoFn<Query, Entity> { - private final V1Beta3Options options; - private final V1Beta3DatastoreFactory datastoreFactory; - // Datastore client - private transient Datastore datastore; - - public ReadFn(V1Beta3Options options) { - this(options, new V1Beta3DatastoreFactory()); - } - - @VisibleForTesting - ReadFn(V1Beta3Options options, V1Beta3DatastoreFactory datastoreFactory) { - this.options = options; - this.datastoreFactory = datastoreFactory; - } - - @StartBundle - public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId()); - } - - /** Read and output entities for the given query. */ - @ProcessElement - public void processElement(ProcessContext context) throws Exception { - Query query = context.element(); - String namespace = options.getNamespace(); - int userLimit = query.hasLimit() - ? query.getLimit().getValue() : Integer.MAX_VALUE; - - boolean moreResults = true; - QueryResultBatch currentBatch = null; - - while (moreResults) { - Query.Builder queryBuilder = query.toBuilder().clone(); - queryBuilder.setLimit(Int32Value.newBuilder().setValue( - Math.min(userLimit, QUERY_BATCH_LIMIT))); - - if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { - queryBuilder.setStartCursor(currentBatch.getEndCursor()); - } - - RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); - RunQueryResponse response = datastore.runQuery(request); - - currentBatch = response.getBatch(); - - // MORE_RESULTS_AFTER_LIMIT is not implemented yet: - // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so - // use result count to determine if more results might exist. - int numFetch = currentBatch.getEntityResultsCount(); - if (query.hasLimit()) { - verify(userLimit >= numFetch, - "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit", - userLimit, numFetch, query.getLimit()); - userLimit -= numFetch; - } - - // output all the entities from the current batch. - for (EntityResult entityResult : currentBatch.getEntityResultsList()) { - context.output(entityResult.getEntity()); - } - - // Check if we have more entities to be read. - moreResults = - // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied - (userLimit > 0) - // All indications from the API are that there are/may be more results. - && ((numFetch == QUERY_BATCH_LIMIT) - || (currentBatch.getMoreResults() == NOT_FINISHED)); - } - } - } - } - - /** - * Returns an empty {@link V1Beta3.Write} builder. Configure the destination - * {@code projectId} using {@link V1Beta3.Write#withProjectId}. - */ - public Write write() { - return new Write(null); - } - - /** - * Returns an empty {@link DeleteEntity} builder. Configure the destination - * {@code projectId} using {@link DeleteEntity#withProjectId}. - */ - public DeleteEntity deleteEntity() { - return new DeleteEntity(null); - } - - /** - * Returns an empty {@link DeleteKey} builder. Configure the destination - * {@code projectId} using {@link DeleteKey#withProjectId}. - */ - public DeleteKey deleteKey() { - return new DeleteKey(null); - } - - /** - * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. - * - * @see DatastoreIO - */ - public static class Write extends Mutate<Entity> { - /** - * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if - * it is {@code null} at instantiation time, an error will be thrown. - */ - Write(@Nullable String projectId) { - super(projectId, new UpsertFn()); - } - - /** - * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project. - */ - public Write withProjectId(String projectId) { - checkNotNull(projectId, "projectId"); - return new Write(projectId); - } - } - - /** - * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore. - * - * @see DatastoreIO - */ - public static class DeleteEntity extends Mutate<Entity> { - /** - * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if - * it is {@code null} at instantiation time, an error will be thrown. - */ - DeleteEntity(@Nullable String projectId) { - super(projectId, new DeleteEntityFn()); - } - - /** - * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the - * specified project. - */ - public DeleteEntity withProjectId(String projectId) { - checkNotNull(projectId, "projectId"); - return new DeleteEntity(projectId); - } - } - - /** - * A {@link PTransform} that deletes {@link Entity Entities} associated with the given - * {@link Key Keys} from Cloud Datastore. - * - * @see DatastoreIO - */ - public static class DeleteKey extends Mutate<Key> { - /** - * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if - * it is {@code null} at instantiation time, an error will be thrown. - */ - DeleteKey(@Nullable String projectId) { - super(projectId, new DeleteKeyFn()); - } - - /** - * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the - * specified project. - */ - public DeleteKey withProjectId(String projectId) { - checkNotNull(projectId, "projectId"); - return new DeleteKey(projectId); - } - } - - /** - * A {@link PTransform} that writes mutations to Cloud Datastore. - * - * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}. - * {@code T} is usually either an {@link Entity} or a {@link Key} - * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should - * be used by the {@code DoFn} provided, as the commits are retried when failures occur. - */ - private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> { - @Nullable - private final String projectId; - /** A function that transforms each {@code T} into a mutation. */ - private final SimpleFunction<T, Mutation> mutationFn; - - /** - * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if - * it is {@code null} at instantiation time, an error will be thrown. - */ - Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) { - this.projectId = projectId; - this.mutationFn = checkNotNull(mutationFn); - } - - @Override - public PDone apply(PCollection<T> input) { - input.apply("Convert to Mutation", MapElements.via(mutationFn)) - .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId))); - - return PDone.in(input.getPipeline()); - } - - @Override - public void validate(PCollection<T> input) { - checkNotNull(projectId, "projectId"); - checkNotNull(mutationFn, "mutationFn"); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("projectId", projectId) - .add("mutationFn", mutationFn.getClass().getName()) - .toString(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")) - .include(mutationFn); - } - - public String getProjectId() { - return projectId; - } - } - - /** - * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in - * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. - * - * <p>See <a - * href="https://cloud.google.com/datastore/docs/concepts/entities"> - * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations. - * - * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} - * times). This means that the mutation operation should be idempotent. Thus, the writer should - * only be used for {code upsert} and {@code delete} mutation operations, as these are the only - * two Cloud Datastore mutations that are idempotent. - */ - @VisibleForTesting - static class DatastoreWriterFn extends DoFn<Mutation, Void> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); - private final String projectId; - private transient Datastore datastore; - private final V1Beta3DatastoreFactory datastoreFactory; - // Current batch of mutations to be written. - private final List<Mutation> mutations = new ArrayList<>(); - /** - * Since a bundle is written in batches, we should retry the commit of a batch in order to - * prevent transient errors from causing the bundle to fail. - */ - private static final int MAX_RETRIES = 5; - - /** - * Initial backoff time for exponential backoff for retry attempts. - */ - private static final int INITIAL_BACKOFF_MILLIS = 5000; - - DatastoreWriterFn(String projectId) { - this(projectId, new V1Beta3DatastoreFactory()); - } - - @VisibleForTesting - DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) { - this.projectId = checkNotNull(projectId, "projectId"); - this.datastoreFactory = datastoreFactory; - } - - @StartBundle - public void startBundle(Context c) { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - mutations.add(c.element()); - if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { - flushBatch(); - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (mutations.size() > 0) { - flushBatch(); - } - } - - /** - * Writes a batch of mutations to Cloud Datastore. - * - * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} - * times). All mutations in the batch will be committed again, even if the commit was partially - * successful. If the retry limit is exceeded, the last exception from the Datastore will be - * thrown. - * - * @throws DatastoreException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.debug("Writing batch of {} mutations", mutations.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - - while (true) { - // Batch upsert entities. - try { - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.addAllMutations(mutations); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - datastore.commit(commitRequest.build()); - // Break if the commit threw no exception. - break; - } catch (DatastoreException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), - exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - LOG.debug("Successfully wrote {} mutations", mutations.size()); - mutations.clear(); - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); - } - } - - /** - * Returns true if a Datastore key is complete. A key is complete if its last element - * has either an id or a name. - */ - static boolean isValidKey(Key key) { - List<PathElement> elementList = key.getPathList(); - if (elementList.isEmpty()) { - return false; - } - PathElement lastElement = elementList.get(elementList.size() - 1); - return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); - } - - /** - * A function that constructs an upsert {@link Mutation} from an {@link Entity}. - */ - @VisibleForTesting - static class UpsertFn extends SimpleFunction<Entity, Mutation> { - @Override - public Mutation apply(Entity entity) { - // Verify that the entity to write has a complete key. - checkArgument(isValidKey(entity.getKey()), - "Entities to be written to the Datastore must have complete keys:\n%s", entity); - - return makeUpsert(entity).build(); - } - - @Override - public void populateDisplayData(Builder builder) { - builder.add(DisplayData.item("upsertFn", this.getClass()) - .withLabel("Create Upsert Mutation")); - } - } - - /** - * A function that constructs a delete {@link Mutation} from an {@link Entity}. - */ - @VisibleForTesting - static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> { - @Override - public Mutation apply(Entity entity) { - // Verify that the entity to delete has a complete key. - checkArgument(isValidKey(entity.getKey()), - "Entities to be deleted from the Datastore must have complete keys:\n%s", entity); - - return makeDelete(entity.getKey()).build(); - } - - @Override - public void populateDisplayData(Builder builder) { - builder.add(DisplayData.item("deleteEntityFn", this.getClass()) - .withLabel("Create Delete Mutation")); - } - } - - /** - * A function that constructs a delete {@link Mutation} from a {@link Key}. - */ - @VisibleForTesting - static class DeleteKeyFn extends SimpleFunction<Key, Mutation> { - @Override - public Mutation apply(Key key) { - // Verify that the entity to delete has a complete key. - checkArgument(isValidKey(key), - "Keys to be deleted from the Datastore must be complete:\n%s", key); - - return makeDelete(key).build(); - } - - @Override - public void populateDisplayData(Builder builder) { - builder.add(DisplayData.item("deleteKeyFn", this.getClass()) - .withLabel("Create Delete Mutation")); - } - } - - /** - * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and - * {@link QuerySplitter} - * - * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence - * wrapping them under this class, which implements {@link Serializable}. - */ - @VisibleForTesting - static class V1Beta3DatastoreFactory implements Serializable { - - /** Builds a Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - - return DatastoreFactory.get().create(builder.build()); - } - - /** Builds a Datastore {@link QuerySplitter}. */ - public QuerySplitter getQuerySplitter() { - return DatastoreHelper.getQuerySplitter(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java new file mode 100644 index 0000000..31b5da4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.datastore; + +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL; +import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1.client.DatastoreHelper.makeDelete; +import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeKey; +import static com.google.datastore.v1.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKey; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKeyFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.ReadFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; + +import com.google.datastore.v1.CommitRequest; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.Mutation; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.RunQueryRequest; +import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.client.Datastore; +import com.google.datastore.v1.client.QuerySplitter; +import com.google.protobuf.Int32Value; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +/** + * Tests for {@link DatastoreV1}. + */ +@RunWith(JUnit4.class) +public class DatastoreV1Test { + private static final String PROJECT_ID = "testProject"; + private static final String NAMESPACE = "testNamespace"; + private static final String KIND = "testKind"; + private static final Query QUERY; + private static final V1Options V_1_OPTIONS; + static { + Query.Builder q = Query.newBuilder(); + q.addKindBuilder().setName(KIND); + QUERY = q.build(); + V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE); + } + private DatastoreV1.Read initialRead; + + @Mock + Datastore mockDatastore; + @Mock + QuerySplitter mockQuerySplitter; + @Mock + V1DatastoreFactory mockDatastoreFactory; + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + initialRead = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); + + when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class))) + .thenReturn(mockDatastore); + when(mockDatastoreFactory.getQuerySplitter()) + .thenReturn(mockQuerySplitter); + } + + @Test + public void testBuildRead() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); + assertEquals(QUERY, read.getQuery()); + assertEquals(PROJECT_ID, read.getProjectId()); + assertEquals(NAMESPACE, read.getNamespace()); + } + + /** + * {@link #testBuildRead} but constructed in a different order. + */ + @Test + public void testBuildReadAlt() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY); + assertEquals(QUERY, read.getQuery()); + assertEquals(PROJECT_ID, read.getProjectId()); + assertEquals(NAMESPACE, read.getNamespace()); + } + + @Test + public void testReadValidationFailsProject() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read().withQuery(QUERY); + thrown.expect(NullPointerException.class); + thrown.expectMessage("project"); + read.validate(null); + } + + @Test + public void testReadValidationFailsQuery() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID); + thrown.expect(NullPointerException.class); + thrown.expectMessage("query"); + read.validate(null); + } + + @Test + public void testReadValidationFailsQueryLimitZero() throws Exception { + Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid query limit 0: must be positive"); + + DatastoreIO.v1().read().withQuery(invalidLimit); + } + + @Test + public void testReadValidationFailsQueryLimitNegative() throws Exception { + Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid query limit -5: must be positive"); + + DatastoreIO.v1().read().withQuery(invalidLimit); + } + + @Test + public void testReadValidationSucceedsNamespace() throws Exception { + DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY); + /* Should succeed, as a null namespace is fine. */ + read.validate(null); + } + + @Test + public void testReadDisplayData() { + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(PROJECT_ID) + .withQuery(QUERY) + .withNamespace(NAMESPACE); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + assertThat(displayData, hasDisplayItem("query", QUERY.toString())); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + } + + @Test + @Category(RunnableOnService.class) + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1().read().withProjectId( + "myProject").withQuery(Query.newBuilder().build()); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("DatastoreIO read should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + + @Test + public void testWriteDoesNotAllowNullProject() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + DatastoreIO.v1().write().withProjectId(null); + } + + @Test + public void testWriteValidationFailsWithNoProject() throws Exception { + Write write = DatastoreIO.v1().write(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + write.validate(null); + } + + @Test + public void testWriteValidationSucceedsWithProject() throws Exception { + Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + write.validate(null); + } + + @Test + public void testWriteDisplayData() { + Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + @Test + public void testDeleteEntityDoesNotAllowNullProject() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + DatastoreIO.v1().deleteEntity().withProjectId(null); + } + + @Test + public void testDeleteEntityValidationFailsWithNoProject() throws Exception { + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + deleteEntity.validate(null); + } + + @Test + public void testDeleteEntityValidationSucceedsWithProject() throws Exception { + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); + deleteEntity.validate(null); + } + + @Test + public void testDeleteEntityDisplayData() { + DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID); + + DisplayData displayData = DisplayData.from(deleteEntity); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + @Test + public void testDeleteKeyDoesNotAllowNullProject() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + DatastoreIO.v1().deleteKey().withProjectId(null); + } + + @Test + public void testDeleteKeyValidationFailsWithNoProject() throws Exception { + DeleteKey deleteKey = DatastoreIO.v1().deleteKey(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + deleteKey.validate(null); + } + + @Test + public void testDeleteKeyValidationSucceedsWithProject() throws Exception { + DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); + deleteKey.validate(null); + } + + @Test + public void testDeleteKeyDisplayData() { + DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID); + + DisplayData displayData = DisplayData.from(deleteKey); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + @Test + @Category(RunnableOnService.class) + public void testWritePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Entity>, ?> write = + DatastoreIO.v1().write().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the upsertFn in its primitive display data", + displayData, hasItem(hasDisplayItem("upsertFn"))); + + } + + @Test + @Category(RunnableOnService.class) + public void testDeleteEntityPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Entity>, ?> write = + DatastoreIO.v1().deleteEntity().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data", + displayData, hasItem(hasDisplayItem("deleteEntityFn"))); + + } + + @Test + @Category(RunnableOnService.class) + public void testDeleteKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Key>, ?> write = + DatastoreIO.v1().deleteKey().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data", + displayData, hasItem(hasDisplayItem("deleteKeyFn"))); + + } + + /** + * Test building a Write using builder methods. + */ + @Test + public void testBuildWrite() throws Exception { + DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID); + assertEquals(PROJECT_ID, write.getProjectId()); + } + + /** + * Test the detection of complete and incomplete keys. + */ + @Test + public void testHasNameOrId() { + Key key; + // Complete with name, no ancestor + key = makeKey("bird", "finch").build(); + assertTrue(isValidKey(key)); + + // Complete with id, no ancestor + key = makeKey("bird", 123).build(); + assertTrue(isValidKey(key)); + + // Incomplete, no ancestor + key = makeKey("bird").build(); + assertFalse(isValidKey(key)); + + // Complete with name and ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird", "horned").build(); + assertTrue(isValidKey(key)); + + // Complete with id and ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird", 123).build(); + assertTrue(isValidKey(key)); + + // Incomplete with ancestor + key = makeKey("bird", "owl").build(); + key = makeKey(key, "bird").build(); + assertFalse(isValidKey(key)); + + key = makeKey().build(); + assertFalse(isValidKey(key)); + } + + /** + * Test that entities with incomplete keys cannot be updated. + */ + @Test + public void testAddEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Entities to be written to the Datastore must have complete keys"); + + upsertFn.apply(entity); + } + + @Test + /** + * Test that entities with valid keys are transformed to upsert mutations. + */ + public void testAddEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); + + Mutation exceptedMutation = makeUpsert(entity).build(); + assertEquals(upsertFn.apply(entity), exceptedMutation); + } + + /** + * Test that entities with incomplete keys cannot be deleted. + */ + @Test + public void testDeleteEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys"); + + deleteEntityFn.apply(entity); + } + + /** + * Test that entities with valid keys are transformed to delete mutations. + */ + @Test + public void testDeleteEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + + Mutation exceptedMutation = makeDelete(entity.getKey()).build(); + assertEquals(deleteEntityFn.apply(entity), exceptedMutation); + } + + /** + * Test that incomplete keys cannot be deleted. + */ + @Test + public void testDeleteIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Keys to be deleted from the Datastore must be complete"); + + deleteKeyFn.apply(key); + } + + /** + * Test that valid keys are transformed to delete mutations. + */ + @Test + public void testDeleteKeys() throws Exception { + Key key = makeKey("bird", "finch").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + + Mutation exceptedMutation = makeDelete(key).build(); + assertEquals(deleteKeyFn.apply(key), exceptedMutation); + } + + @Test + public void testDatastoreWriteFnDisplayData() { + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID); + DisplayData displayData = DisplayData.from(datastoreWriter); + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ + @Test + public void testDatatoreWriterFnWithOneBatch() throws Exception { + datastoreWriterFnTest(100); + } + + /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */ + @Test + public void testDatatoreWriterFnWithMultipleBatches() throws Exception { + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100); + } + + /** + * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of + * write batch size. + */ + @Test + public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2); + } + + // A helper method to test DatastoreWriterFn for various batch sizes. + private void datastoreWriterFnTest(int numMutations) throws Exception { + // Create the requested number of mutations. + List<Mutation> mutations = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); + } + + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(mutations); + + int start = 0; + while (start < numMutations) { + int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT); + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + commitRequest.addAllMutations(mutations.subList(start, end)); + // Verify all the batch requests were made with the expected mutations. + verify(mockDatastore, times(1)).commit(commitRequest.build()); + start = end; + } + } + + /** + * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a + * query. + */ + @Test + public void testEstimatedSizeBytes() throws Exception { + long entityBytes = 100L; + // Per Kind statistics request and response + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(statRequest)) + .thenReturn(statResponse); + + assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE)); + verify(mockDatastore, times(1)).runQuery(statRequest); + } + + /** + * Tests {@link SplitQueryFn} when number of query splits is specified. + */ + @Test + public void testSplitQueryFnWithNumSplits() throws Exception { + int numSplits = 100; + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, numSplits)); + + SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); + DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); + /** + * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through + * mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * doFn from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY); + + assertEquals(queries.size(), numSplits); + verifyUniqueKeys(queries); + verify(mockQuerySplitter, times(1)).getSplits( + eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); + verifyZeroInteractions(mockDatastore); + } + + /** + * Tests {@link SplitQueryFn} when no query splits is specified. + */ + @Test + public void testSplitQueryFnWithoutNumSplits() throws Exception { + // Force SplitQueryFn to compute the number of query splits + int numSplits = 0; + int expectedNumSplits = 20; + long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; + + // Per Kind statistics request and response + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + + when(mockDatastore.runQuery(statRequest)) + .thenReturn(statResponse); + when(mockQuerySplitter.getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) + .thenReturn(splitQuery(QUERY, expectedNumSplits)); + + SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory); + DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY); + + assertEquals(queries.size(), expectedNumSplits); + verifyUniqueKeys(queries); + verify(mockQuerySplitter, times(1)).getSplits( + eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); + verify(mockDatastore, times(1)).runQuery(statRequest); + } + + /** + * Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. + */ + @Test + public void testSplitQueryFnWithQueryLimit() throws Exception { + Query queryWithLimit = QUERY.toBuilder().clone() + .setLimit(Int32Value.newBuilder().setValue(1)) + .build(); + + SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory); + DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit); + + assertEquals(queries.size(), 1); + verifyUniqueKeys(queries); + verifyNoMoreInteractions(mockDatastore); + verifyNoMoreInteractions(mockQuerySplitter); + } + + /** Tests {@link ReadFn} with a query limit less than one batch. */ + @Test + public void testReadFnWithOneBatch() throws Exception { + readFnTest(5); + } + + /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */ + @Test + public void testReadFnWithMultipleBatches() throws Exception { + readFnTest(QUERY_BATCH_LIMIT + 5); + } + + /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */ + @Test + public void testReadFnWithBatchesExactMultiple() throws Exception { + readFnTest(5 * QUERY_BATCH_LIMIT); + } + + /** Helper Methods */ + + /** A helper function that verifies if all the queries have unique keys. */ + private void verifyUniqueKeys(List<KV<Integer, Query>> queries) { + Set<Integer> keys = new HashSet<>(); + for (KV<Integer, Query> kv: queries) { + keys.add(kv.getKey()); + } + assertEquals(keys.size(), queries.size()); + } + + /** + * A helper function that creates mock {@link Entity} results in response to a query. Always + * indicates that more results are available, unless the batch is limited to fewer than + * {@link DatastoreV1.Read#QUERY_BATCH_LIMIT} results. + */ + private static RunQueryResponse mockResponseForQuery(Query q) { + // Every query DatastoreV1 sends should have a limit. + assertTrue(q.hasLimit()); + + // The limit should be in the range [1, QUERY_BATCH_LIMIT] + int limit = q.getLimit().getValue(); + assertThat(limit, greaterThanOrEqualTo(1)); + assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT)); + + // Create the requested number of entities. + List<EntityResult> entities = new ArrayList<>(limit); + for (int i = 0; i < limit; ++i) { + entities.add( + EntityResult.newBuilder() + .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1))) + .build()); + } + + // Fill out the other parameters on the returned result batch. + RunQueryResponse.Builder ret = RunQueryResponse.newBuilder(); + ret.getBatchBuilder() + .addAllEntityResults(entities) + .setEntityResultType(EntityResult.ResultType.FULL) + .setMoreResults( + limit == QUERY_BATCH_LIMIT + ? QueryResultBatch.MoreResultsType.NOT_FINISHED + : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS); + + return ret.build(); + } + + /** Helper function to run a test reading from a {@link ReadFn}. */ + private void readFnTest(int numEntities) throws Exception { + // An empty query to read entities. + Query query = Query.newBuilder().setLimit( + Int32Value.newBuilder().setValue(numEntities)).build(); + + // Use mockResponseForQuery to generate results. + when(mockDatastore.runQuery(any(RunQueryRequest.class))) + .thenAnswer(new Answer<RunQueryResponse>() { + @Override + public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable { + Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); + return mockResponseForQuery(q); + } + }); + + ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory); + DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn); + /** + * Although Datastore client is marked transient in {@link ReadFn}, when injected through + * mock factory using a when clause for unit testing purposes, it is not serializable + * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the + * test object from being serialized. + */ + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + List<Entity> entities = doFnTester.processBundle(query); + + int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT); + verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class)); + // Validate the number of results. + assertEquals(numEntities, entities.size()); + } + + /** Builds a per-kind statistics response with the given entity size. */ + private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) { + RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder(); + Entity.Builder entity = Entity.newBuilder(); + entity.setKey(makeKey("dummyKind", "dummyId")); + entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build()); + EntityResult.Builder entityResult = EntityResult.newBuilder(); + entityResult.setEntity(entity); + QueryResultBatch.Builder batch = QueryResultBatch.newBuilder(); + batch.addEntityResults(entityResult); + timestampResponse.setBatch(batch); + return timestampResponse.build(); + } + + /** Builds a per-kind statistics query for the given timestamp and namespace. */ + private static Query makeStatKindQuery(String namespace) { + Query.Builder statQuery = Query.newBuilder(); + if (namespace == null) { + statQuery.addKindBuilder().setName("__Stat_Kind__"); + } else { + statQuery.addKindBuilder().setName("__Ns_Stat_Kind__"); + } + statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build()); + statQuery.addOrder(makeOrder("timestamp", DESCENDING)); + statQuery.setLimit(Int32Value.newBuilder().setValue(1)); + return statQuery.build(); + } + + /** Generate dummy query splits. */ + private List<Query> splitQuery(Query query, int numSplits) { + List<Query> queries = new LinkedList<>(); + for (int i = 0; i < numSplits; i++) { + queries.add(query.toBuilder().clone().build()); + } + return queries; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java deleted file mode 100644 index ddb6d81..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.getDatastore; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeAncestorKey; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeEntity; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.UpsertMutationBuilder; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.V1Beta3TestWriter; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.values.PCollection; - -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.Key; -import com.google.datastore.v1beta3.Query; -import com.google.datastore.v1beta3.client.Datastore; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.UUID; - -/** - * End-to-end tests for Datastore V1Beta3.Read. - */ -@RunWith(JUnit4.class) -public class V1Beta3ReadIT { - private V1Beta3TestOptions options; - private String ancestor; - private final long numEntities = 1000; - - @Before - public void setup() { - PipelineOptionsFactory.register(V1Beta3TestOptions.class); - options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class); - ancestor = UUID.randomUUID().toString(); - } - - /** - * An end-to-end test for {@link V1Beta3.Read}. - * - * Write some test entities to datastore and then run a dataflow pipeline that - * reads and counts the total number of entities. Verify that the count matches the - * number of entities written. - */ - @Test - public void testE2EV1Beta3Read() throws Exception { - // Create entities and write them to datastore - writeEntitiesToDatastore(options, ancestor, numEntities); - - // Read from datastore - Query query = V1Beta3TestUtil.makeAncestorKindQuery( - options.getKind(), options.getNamespace(), ancestor); - - V1Beta3.Read read = DatastoreIO.v1beta3().read() - .withProjectId(options.getProject()) - .withQuery(query) - .withNamespace(options.getNamespace()); - - // Count the total number of entities - Pipeline p = Pipeline.create(options); - PCollection<Long> count = p - .apply(read) - .apply(Count.<Entity>globally()); - - PAssert.thatSingleton(count).isEqualTo(numEntities); - p.run(); - } - - // Creates entities and write them to datastore - private static void writeEntitiesToDatastore(V1Beta3TestOptions options, String ancestor, - long numEntities) throws Exception { - Datastore datastore = getDatastore(options, options.getProject()); - // Write test entities to datastore - V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new UpsertMutationBuilder()); - Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor); - - for (long i = 0; i < numEntities; i++) { - Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace()); - writer.write(entity); - } - writer.close(); - } - - @After - public void tearDown() throws Exception { - deleteAllEntities(options, ancestor); - } -}
