Repository: incubator-beam Updated Branches: refs/heads/master 6645dcd4a -> 78fda4513
Datastore Sink support for writing Mutations This generalizes Write to Write and Delete cleanly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95330658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95330658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95330658 Branch: refs/heads/master Commit: 953306584073044c41bcfdc4ea5e14870ddea5e4 Parents: 6645dcd Author: Vikas Kedigehalli <[email protected]> Authored: Wed Aug 17 18:19:52 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri Aug 19 15:41:53 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 436 +++++++++++++------ .../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 237 ++++++++-- 2 files changed, 508 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index 0d2e2cb..8503b66 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify; import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; @@ -36,8 +37,10 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -60,6 +63,7 @@ import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; import com.google.datastore.v1beta3.Key.PathElement; +import com.google.datastore.v1beta3.Mutation; import com.google.datastore.v1beta3.PartitionId; import com.google.datastore.v1beta3.Query; import com.google.datastore.v1beta3.QueryResultBatch; @@ -84,7 +88,7 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; /** - * <p>{@link V1Beta3} provides an API to Read and Write {@link PCollection PCollections} of + * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3 * {@link Entity} objects. * @@ -129,7 +133,25 @@ import javax.annotation.Nullable; * p.run(); * } </pre> * - * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete + * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use + * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to: + * + * <pre> {@code + * PCollection<Entity> entities = ...; + * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId)); + * p.run(); + * } </pre> + * + * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore, + * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to: + * + * <pre> {@code + * PCollection<Entity> entities = ...; + * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId)); + * p.run(); + * } </pre> + * + * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}. @@ -139,9 +161,9 @@ import javax.annotation.Nullable; * keyBuilder.getPartitionIdBuilder().setNamespace(namespace); * }</pre> * - * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read - * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and - * Keys</a> for more information about {@code Entity} keys. + * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please + * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, + * and Keys</a> for more information about {@code Entity} keys. * * <p><h3>Permissions</h3> * Permission requirements depend on the {@code PipelineRunner} that is used to execute the @@ -642,20 +664,33 @@ public class V1Beta3 { } /** + * Returns an empty {@link DeleteEntity} builder. Configure the destination + * {@code projectId} using {@link DeleteEntity#withProjectId}. + */ + public DeleteEntity deleteEntity() { + return new DeleteEntity(null); + } + + /** + * Returns an empty {@link DeleteKey} builder. Configure the destination + * {@code projectId} using {@link DeleteKey#withProjectId}. + */ + public DeleteKey deleteKey() { + return new DeleteKey(null); + } + + /** * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore. * * @see DatastoreIO */ - public static class Write extends PTransform<PCollection<Entity>, PDone> { - @Nullable - private final String projectId; - + public static class Write extends Mutate<Entity> { /** * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - public Write(@Nullable String projectId) { - this.projectId = projectId; + Write(@Nullable String projectId) { + super(projectId, new UpsertFn()); } /** @@ -665,27 +700,99 @@ public class V1Beta3 { checkNotNull(projectId, "projectId"); return new Write(projectId); } + } - @Override - public PDone apply(PCollection<Entity> input) { - input.apply(ParDo.of(new DatastoreWriterFn(projectId))); - return PDone.in(input.getPipeline()); + /** + * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore. + * + * @see DatastoreIO + */ + public static class DeleteEntity extends Mutate<Entity> { + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if + * it is {@code null} at instantiation time, an error will be thrown. + */ + DeleteEntity(@Nullable String projectId) { + super(projectId, new DeleteEntityFn()); } - @Override - public void validate(PCollection<Entity> input) { + /** + * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the + * specified project. + */ + public DeleteEntity withProjectId(String projectId) { + checkNotNull(projectId, "projectId"); + return new DeleteEntity(projectId); + } + } + + /** + * A {@link PTransform} that deletes {@link Entity Entities} associated with the given + * {@link Key Keys} from Cloud Datastore. + * + * @see DatastoreIO + */ + public static class DeleteKey extends Mutate<Key> { + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if + * it is {@code null} at instantiation time, an error will be thrown. + */ + DeleteKey(@Nullable String projectId) { + super(projectId, new DeleteKeyFn()); + } + + /** + * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the + * specified project. + */ + public DeleteKey withProjectId(String projectId) { checkNotNull(projectId, "projectId"); + return new DeleteKey(projectId); } + } + /** + * A {@link PTransform} that writes mutations to Cloud Datastore. + * + * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}. + * {@code T} is usually either an {@link Entity} or a {@link Key} + * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should + * be used by the {@code DoFn} provided, as the commits are retried when failures occur. + */ + private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> { @Nullable - public String getProjectId() { - return projectId; + private final String projectId; + /** A function that transforms each {@code T} into a mutation. */ + private final SimpleFunction<T, Mutation> mutationFn; + + /** + * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if + * it is {@code null} at instantiation time, an error will be thrown. + */ + Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) { + this.projectId = projectId; + this.mutationFn = checkNotNull(mutationFn); + } + + @Override + public PDone apply(PCollection<T> input) { + input.apply("Convert to Mutation", MapElements.via(mutationFn)) + .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId))); + + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<T> input) { + checkNotNull(projectId, "projectId"); + checkNotNull(mutationFn, "mutationFn"); } @Override public String toString() { return MoreObjects.toStringHelper(getClass()) .add("projectId", projectId) + .add("mutationFn", mutationFn.getClass().getName()) .toString(); } @@ -694,141 +801,200 @@ public class V1Beta3 { super.populateDisplayData(builder); builder .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); + .withLabel("Output Project")) + .include(mutationFn); } + public String getProjectId() { + return projectId; + } + } + + /** + * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in + * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. + * + * <p>See <a + * href="https://cloud.google.com/datastore/docs/concepts/entities"> + * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations. + * + * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity + * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} + * times). This means that the mutation operation should be idempotent. Thus, the writer should + * only be used for {code upsert} and {@code delete} mutation operations, as these are the only + * two Cloud Datastore mutations that are idempotent. + */ + @VisibleForTesting + static class DatastoreWriterFn extends DoFn<Mutation, Void> { + private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); + private final String projectId; + private transient Datastore datastore; + private final V1Beta3DatastoreFactory datastoreFactory; + // Current batch of mutations to be written. + private final List<Mutation> mutations = new ArrayList<>(); + /** + * Since a bundle is written in batches, we should retry the commit of a batch in order to + * prevent transient errors from causing the bundle to fail. + */ + private static final int MAX_RETRIES = 5; + /** - * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in - * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. - * Entities are committed as upsert mutations (either update if the key already exists, or - * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name - * or id), the bundle will fail. - * - * <p>See <a - * href="https://cloud.google.com/datastore/docs/concepts/entities"> - * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities. - * - * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} - * times). + * Initial backoff time for exponential backoff for retry attempts. */ - @VisibleForTesting - static class DatastoreWriterFn extends DoFn<Entity, Void> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); - private final String projectId; - private transient Datastore datastore; - private final V1Beta3DatastoreFactory datastoreFactory; - // Current batch of entities to be written. - private final List<Entity> entities = new ArrayList<>(); - /** - * Since a bundle is written in batches, we should retry the commit of a batch in order to - * prevent transient errors from causing the bundle to fail. - */ - private static final int MAX_RETRIES = 5; + private static final int INITIAL_BACKOFF_MILLIS = 5000; - /** - * Initial backoff time for exponential backoff for retry attempts. - */ - private static final int INITIAL_BACKOFF_MILLIS = 5000; + DatastoreWriterFn(String projectId) { + this(projectId, new V1Beta3DatastoreFactory()); + } - public DatastoreWriterFn(String projectId) { - this(projectId, new V1Beta3DatastoreFactory()); - } + @VisibleForTesting + DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) { + this.projectId = checkNotNull(projectId, "projectId"); + this.datastoreFactory = datastoreFactory; + } - @VisibleForTesting - DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) { - this.projectId = checkNotNull(projectId, "projectId"); - this.datastoreFactory = datastoreFactory; - } + @StartBundle + public void startBundle(Context c) { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + } - @StartBundle - public void startBundle(Context c) { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + mutations.add(c.element()); + if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { + flushBatch(); } + } - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - // Verify that the entity to write has a complete key. - if (!isValidKey(c.element().getKey())) { - throw new IllegalArgumentException( - "Entities to be written to the Datastore must have complete keys"); - } - - entities.add(c.element()); - if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { - flushBatch(); - } + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (mutations.size() > 0) { + flushBatch(); } + } - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (entities.size() > 0) { - flushBatch(); - } - } + /** + * Writes a batch of mutations to Cloud Datastore. + * + * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} + * times). All mutations in the batch will be committed again, even if the commit was partially + * successful. If the retry limit is exceeded, the last exception from the Datastore will be + * thrown. + * + * @throws DatastoreException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws DatastoreException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - /** - * Writes a batch of entities to Cloud Datastore. - * - * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} - * times). All entities in the batch will be committed again, even if the commit was partially - * successful. If the retry limit is exceeded, the last exception from the Datastore will be - * thrown. - * - * @throws DatastoreException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.debug("Writing batch of {} entities", entities.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - - while (true) { - // Batch upsert entities. - try { - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - for (Entity entity: entities) { - commitRequest.addMutations(makeUpsert(entity)); - } - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - datastore.commit(commitRequest.build()); - // Break if the commit threw no exception. - break; - } catch (DatastoreException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), - exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } + while (true) { + // Batch upsert entities. + try { + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.addAllMutations(mutations); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + datastore.commit(commitRequest.build()); + // Break if the commit threw no exception. + break; + } catch (DatastoreException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; } } - LOG.debug("Successfully wrote {} entities", entities.size()); - entities.clear(); } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations.clear(); + } - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); - } + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", projectId) + .withLabel("Output Project")); } + } - /** - * Returns true if a Datastore key is complete. A key is complete if its last element - * has either an id or a name. - */ - static boolean isValidKey(Key key) { - List<PathElement> elementList = key.getPathList(); - if (elementList.isEmpty()) { - return false; - } - PathElement lastElement = elementList.get(elementList.size() - 1); - return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); + /** + * Returns true if a Datastore key is complete. A key is complete if its last element + * has either an id or a name. + */ + static boolean isValidKey(Key key) { + List<PathElement> elementList = key.getPathList(); + if (elementList.isEmpty()) { + return false; + } + PathElement lastElement = elementList.get(elementList.size() - 1); + return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); + } + + /** + * A function that constructs an upsert {@link Mutation} from an {@link Entity}. + */ + @VisibleForTesting + static class UpsertFn extends SimpleFunction<Entity, Mutation> { + @Override + public Mutation apply(Entity entity) { + // Verify that the entity to write has a complete key. + checkArgument(isValidKey(entity.getKey()), + "Entities to be written to the Datastore must have complete keys:\n%s", entity); + + return makeUpsert(entity).build(); + } + + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("upsertFn", this.getClass()) + .withLabel("Create Upsert Mutation")); + } + } + + /** + * A function that constructs a delete {@link Mutation} from an {@link Entity}. + */ + @VisibleForTesting + static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> { + @Override + public Mutation apply(Entity entity) { + // Verify that the entity to delete has a complete key. + checkArgument(isValidKey(entity.getKey()), + "Entities to be deleted from the Datastore must have complete keys:\n%s", entity); + + return makeDelete(entity.getKey()).build(); + } + + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("deleteEntityFn", this.getClass()) + .withLabel("Create Delete Mutation")); + } + } + + /** + * A function that constructs a delete {@link Mutation} from a {@link Key}. + */ + @VisibleForTesting + static class DeleteKeyFn extends SimpleFunction<Key, Mutation> { + @Override + public Mutation apply(Key key) { + // Verify that the entity to delete has a complete key. + checkArgument(isValidKey(key), + "Keys to be deleted from the Datastore must be complete:\n%s", key); + + return makeDelete(key).build(); + } + + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("deleteKeyFn", this.getClass()) + .withLabel("Create Delete Mutation")); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/95330658/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java index 8fa34da..b0c6c18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java @@ -22,9 +22,11 @@ import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_S import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest; +import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; @@ -45,12 +47,17 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.DoFnTester; @@ -67,6 +74,7 @@ import com.google.datastore.v1beta3.CommitRequest; import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; +import com.google.datastore.v1beta3.Mutation; import com.google.datastore.v1beta3.PartitionId; import com.google.datastore.v1beta3.Query; import com.google.datastore.v1beta3.QueryResultBatch; @@ -233,7 +241,7 @@ public class V1Beta3Test { @Test public void testWriteValidationFailsWithNoProject() throws Exception { - V1Beta3.Write write = DatastoreIO.v1beta3().write(); + Write write = DatastoreIO.v1beta3().write(); thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); @@ -242,15 +250,14 @@ public class V1Beta3Test { } @Test - public void testSinkValidationSucceedsWithProject() throws Exception { - V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); + public void testWriteValidationSucceedsWithProject() throws Exception { + Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); write.validate(null); } @Test public void testWriteDisplayData() { - V1Beta3.Write write = DatastoreIO.v1beta3().write() - .withProjectId(PROJECT_ID); + Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); DisplayData displayData = DisplayData.from(write); @@ -258,8 +265,74 @@ public class V1Beta3Test { } @Test + public void testDeleteEntityDoesNotAllowNullProject() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + DatastoreIO.v1beta3().deleteEntity().withProjectId(null); + } + + @Test + public void testDeleteEntityValidationFailsWithNoProject() throws Exception { + DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + deleteEntity.validate(null); + } + + @Test + public void testDeleteEntityValidationSucceedsWithProject() throws Exception { + DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID); + deleteEntity.validate(null); + } + + @Test + public void testDeleteEntityDisplayData() { + DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID); + + DisplayData displayData = DisplayData.from(deleteEntity); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + @Test + public void testDeleteKeyDoesNotAllowNullProject() throws Exception { + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + DatastoreIO.v1beta3().deleteKey().withProjectId(null); + } + + @Test + public void testDeleteKeyValidationFailsWithNoProject() throws Exception { + DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey(); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("projectId"); + + deleteKey.validate(null); + } + + @Test + public void testDeleteKeyValidationSucceedsWithProject() throws Exception { + DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID); + deleteKey.validate(null); + } + + @Test + public void testDeleteKeyDisplayData() { + DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID); + + DisplayData displayData = DisplayData.from(deleteKey); + + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); + } + + @Test @Category(RunnableOnService.class) - public void testSinkPrimitiveDisplayData() { + public void testWritePrimitiveDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); PTransform<PCollection<Entity>, ?> write = DatastoreIO.v1beta3().write().withProjectId("myProject"); @@ -267,6 +340,39 @@ public class V1Beta3Test { Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); assertThat("DatastoreIO write should include the project in its primitive display data", displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the upsertFn in its primitive display data", + displayData, hasItem(hasDisplayItem("upsertFn"))); + + } + + @Test + @Category(RunnableOnService.class) + public void testDeleteEntityPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Entity>, ?> write = + DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data", + displayData, hasItem(hasDisplayItem("deleteEntityFn"))); + + } + + @Test + @Category(RunnableOnService.class) + public void testDeleteKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Key>, ?> write = + DatastoreIO.v1beta3().deleteKey().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data", + displayData, hasItem(hasDisplayItem("deleteKeyFn"))); + } /** @@ -286,33 +392,33 @@ public class V1Beta3Test { Key key; // Complete with name, no ancestor key = makeKey("bird", "finch").build(); - assertTrue(Write.isValidKey(key)); + assertTrue(isValidKey(key)); // Complete with id, no ancestor key = makeKey("bird", 123).build(); - assertTrue(Write.isValidKey(key)); + assertTrue(isValidKey(key)); // Incomplete, no ancestor key = makeKey("bird").build(); - assertFalse(Write.isValidKey(key)); + assertFalse(isValidKey(key)); // Complete with name and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", "horned").build(); - assertTrue(Write.isValidKey(key)); + assertTrue(isValidKey(key)); // Complete with id and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", 123).build(); - assertTrue(Write.isValidKey(key)); + assertTrue(isValidKey(key)); // Incomplete with ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird").build(); - assertFalse(Write.isValidKey(key)); + assertFalse(isValidKey(key)); key = makeKey().build(); - assertFalse(Write.isValidKey(key)); + assertFalse(isValidKey(key)); } /** @@ -322,14 +428,86 @@ public class V1Beta3Test { public void testAddEntitiesWithIncompleteKeys() throws Exception { Key key = makeKey("bird").build(); Entity entity = Entity.newBuilder().setKey(key).build(); - DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); - DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + UpsertFn upsertFn = new UpsertFn(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Entities to be written to the Datastore must have complete keys"); - doFnTester.processBundle(entity); + upsertFn.apply(entity); + } + + @Test + /** + * Test that entities with valid keys are transformed to upsert mutations. + */ + public void testAddEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + UpsertFn upsertFn = new UpsertFn(); + + Mutation exceptedMutation = makeUpsert(entity).build(); + assertEquals(upsertFn.apply(entity), exceptedMutation); + } + + /** + * Test that entities with incomplete keys cannot be deleted. + */ + @Test + public void testDeleteEntitiesWithIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys"); + + deleteEntityFn.apply(entity); + } + + /** + * Test that entities with valid keys are transformed to delete mutations. + */ + @Test + public void testDeleteEntities() throws Exception { + Key key = makeKey("bird", "finch").build(); + Entity entity = Entity.newBuilder().setKey(key).build(); + DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); + + Mutation exceptedMutation = makeDelete(entity.getKey()).build(); + assertEquals(deleteEntityFn.apply(entity), exceptedMutation); + } + + /** + * Test that incomplete keys cannot be deleted. + */ + @Test + public void testDeleteIncompleteKeys() throws Exception { + Key key = makeKey("bird").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Keys to be deleted from the Datastore must be complete"); + + deleteKeyFn.apply(key); + } + + /** + * Test that valid keys are transformed to delete mutations. + */ + @Test + public void testDeleteKeys() throws Exception { + Key key = makeKey("bird", "finch").build(); + DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); + + Mutation exceptedMutation = makeDelete(key).build(); + assertEquals(deleteKeyFn.apply(key), exceptedMutation); + } + + @Test + public void testDatastoreWriteFnDisplayData() { + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID); + DisplayData displayData = DisplayData.from(datastoreWriter); + assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ @@ -354,27 +532,26 @@ public class V1Beta3Test { } // A helper method to test DatastoreWriterFn for various batch sizes. - private void datastoreWriterFnTest(int numEntities) throws Exception { + private void datastoreWriterFnTest(int numMutations) throws Exception { // Create the requested number of mutations. - List<Entity> entities = new ArrayList<>(numEntities); - for (int i = 0; i < numEntities; ++i) { - entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()); + List<Mutation> mutations = new ArrayList<>(numMutations); + for (int i = 0; i < numMutations; ++i) { + mutations.add( + makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); - DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter); + DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(entities); + doFnTester.processBundle(mutations); int start = 0; - while (start < numEntities) { - int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT); + while (start < numMutations) { + int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - for (Entity entity: entities.subList(start, end)) { - commitRequest.addMutations(makeUpsert(entity)); - } - // Verify all the batch requests were made with the expected entities. + commitRequest.addAllMutations(mutations.subList(start, end)); + // Verify all the batch requests were made with the expected mutations. verify(mockDatastore, times(1)).commit(commitRequest.build()); start = end; }
