DatastoreIO Sink as ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0361ae9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0361ae9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0361ae9 Branch: refs/heads/master Commit: a0361ae99e9e39bb5ff9766508501932416129ec Parents: a07648b Author: Vikas Kedigehalli <[email protected]> Authored: Mon Aug 15 15:28:07 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Aug 17 17:45:05 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/V1Beta3.java | 376 +++++++------------ .../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 88 +++-- 2 files changed, 195 insertions(+), 269 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index 052feb3..0d2e2cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -30,10 +30,6 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.Sink.WriteOperation; -import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; @@ -167,7 +163,8 @@ public class V1Beta3 { * Datastore has a limit of 500 mutations per batch operation, so we flush * changes to Datastore every 500 entities. */ - private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; + @VisibleForTesting + static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; /** * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId}, @@ -634,42 +631,8 @@ public class V1Beta3 { } } } - - /** - * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and - * {@link QuerySplitter} - * - * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence - * wrapping them under this class, which implements {@link Serializable}. - */ - @VisibleForTesting - static class V1Beta3DatastoreFactory implements Serializable { - - /** Builds a Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - - return DatastoreFactory.get().create(builder.build()); - } - - /** Builds a Datastore {@link QuerySplitter}. */ - public QuerySplitter getQuerySplitter() { - return DatastoreHelper.getQuerySplitter(); - } - } } - /** * Returns an empty {@link V1Beta3.Write} builder. Configure the destination * {@code projectId} using {@link V1Beta3.Write#withProjectId}. @@ -705,8 +668,8 @@ public class V1Beta3 { @Override public PDone apply(PCollection<Entity> input) { - return input.apply( - org.apache.beam.sdk.io.Write.to(new DatastoreSink(projectId))); + input.apply(ParDo.of(new DatastoreWriterFn(projectId))); + return PDone.in(input.getPipeline()); } @Override @@ -733,130 +696,127 @@ public class V1Beta3 { .addIfNotNull(DisplayData.item("projectId", projectId) .withLabel("Output Project")); } - } - /** - * A {@link org.apache.beam.sdk.io.Sink} that writes data to Datastore. - */ - static class DatastoreSink extends org.apache.beam.sdk.io.Sink<Entity> { - final String projectId; - - public DatastoreSink(String projectId) { - this.projectId = projectId; - } - - @Override - public void validate(PipelineOptions options) { - checkNotNull(projectId, "projectId"); - } - - @Override - public DatastoreWriteOperation createWriteOperation(PipelineOptions options) { - return new DatastoreWriteOperation(this); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", projectId) - .withLabel("Output Project")); - } - } + /** + * A {@link DoFn} that writes {@link Entity} objects to Cloud Datastore. Entities are written in + * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. + * Entities are committed as upsert mutations (either update if the key already exists, or + * insert if it is a new key). If an entity does not have a complete key (i.e., it has no name + * or id), the bundle will fail. + * + * <p>See <a + * href="https://cloud.google.com/datastore/docs/concepts/entities"> + * Datastore: Entities, Properties, and Keys</a> for information about entity keys and entities. + * + * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity + * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} + * times). + */ + @VisibleForTesting + static class DatastoreWriterFn extends DoFn<Entity, Void> { + private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); + private final String projectId; + private transient Datastore datastore; + private final V1Beta3DatastoreFactory datastoreFactory; + // Current batch of entities to be written. + private final List<Entity> entities = new ArrayList<>(); + /** + * Since a bundle is written in batches, we should retry the commit of a batch in order to + * prevent transient errors from causing the bundle to fail. + */ + private static final int MAX_RETRIES = 5; - /** - * A {@link WriteOperation} that will manage a parallel write to a Datastore sink. - */ - private static class DatastoreWriteOperation - extends WriteOperation<Entity, DatastoreWriteResult> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class); + /** + * Initial backoff time for exponential backoff for retry attempts. + */ + private static final int INITIAL_BACKOFF_MILLIS = 5000; - private final DatastoreSink sink; + public DatastoreWriterFn(String projectId) { + this(projectId, new V1Beta3DatastoreFactory()); + } - public DatastoreWriteOperation(DatastoreSink sink) { - this.sink = sink; - } + @VisibleForTesting + DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) { + this.projectId = checkNotNull(projectId, "projectId"); + this.datastoreFactory = datastoreFactory; + } - @Override - public Coder<DatastoreWriteResult> getWriterResultCoder() { - return SerializableCoder.of(DatastoreWriteResult.class); - } + @StartBundle + public void startBundle(Context c) { + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + } - @Override - public void initialize(PipelineOptions options) throws Exception {} + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + // Verify that the entity to write has a complete key. + if (!isValidKey(c.element().getKey())) { + throw new IllegalArgumentException( + "Entities to be written to the Datastore must have complete keys"); + } - /** - * Finalizes the write. Logs the number of entities written to the Datastore. - */ - @Override - public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options) - throws Exception { - long totalEntities = 0; - for (DatastoreWriteResult result : writerResults) { - totalEntities += result.entitiesWritten; + entities.add(c.element()); + if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { + flushBatch(); + } } - LOG.info("Wrote {} elements.", totalEntities); - } - @Override - public DatastoreWriter createWriter(PipelineOptions options) throws Exception { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(sink.projectId) - .initializer(new RetryHttpRequestInitializer()); - Credential credential = options.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (entities.size() > 0) { + flushBatch(); + } } - Datastore datastore = DatastoreFactory.get().create(builder.build()); - return new DatastoreWriter(this, datastore); - } + /** + * Writes a batch of entities to Cloud Datastore. + * + * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} + * times). All entities in the batch will be committed again, even if the commit was partially + * successful. If the retry limit is exceeded, the last exception from the Datastore will be + * thrown. + * + * @throws DatastoreException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws DatastoreException, IOException, InterruptedException { + LOG.debug("Writing batch of {} entities", entities.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); + + while (true) { + // Batch upsert entities. + try { + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + for (Entity entity: entities) { + commitRequest.addMutations(makeUpsert(entity)); + } + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + datastore.commit(commitRequest.build()); + // Break if the commit threw no exception. + break; + } catch (DatastoreException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} entities", entities.size()); + entities.clear(); + } - @Override - public DatastoreSink getSink() { - return sink; + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", projectId) + .withLabel("Output Project")); + } } - } - - /** - * {@link Writer} that writes entities to a Datastore Sink. Entities are written in batches, - * where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. Entities - * are committed as upsert mutations (either update if the key already exists, or insert if it is - * a new key). If an entity does not have a complete key (i.e., it has no name or id), the bundle - * will fail. - * - * <p>See <a - * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity"> - * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert - * mutations. - * - * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT} - * times). - * - * <p>Visible for testing purposes. - */ - @VisibleForTesting - static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> { - private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class); - private final DatastoreWriteOperation writeOp; - private final Datastore datastore; - private long totalWritten = 0; - - // Visible for testing. - final List<Entity> entities = new ArrayList<>(); - - /** - * Since a bundle is written in batches, we should retry the commit of a batch in order to - * prevent transient errors from causing the bundle to fail. - */ - private static final int MAX_RETRIES = 5; - - /** - * Initial backoff time for exponential backoff for retry attempts. - */ - private static final int INITIAL_BACKOFF_MILLIS = 5000; /** * Returns true if a Datastore key is complete. A key is complete if its last element @@ -870,100 +830,38 @@ public class V1Beta3 { PathElement lastElement = elementList.get(elementList.size() - 1); return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); } + } - DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) { - this.writeOp = writeOp; - this.datastore = datastore; - } - - @Override - public void open(String uId) throws Exception {} - - /** - * Writes an entity to the Datastore. Writes are batched, up to {@link - * V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an - * {@link IllegalArgumentException} will be thrown. - */ - @Override - public void write(Entity value) throws Exception { - // Verify that the entity to write has a complete key. - if (!isValidKey(value.getKey())) { - throw new IllegalArgumentException( - "Entities to be written to the Datastore must have complete keys"); - } - - entities.add(value); + /** + * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and + * {@link QuerySplitter} + * + * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence + * wrapping them under this class, which implements {@link Serializable}. + */ + @VisibleForTesting + static class V1Beta3DatastoreFactory implements Serializable { - if (entities.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) { - flushBatch(); - } - } + /** Builds a Datastore client for the given pipeline options and project. */ + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder() + .projectId(projectId) + .initializer( + new RetryHttpRequestInitializer() + ); - /** - * Flushes any pending batch writes and returns a DatastoreWriteResult. - */ - @Override - public DatastoreWriteResult close() throws Exception { - if (entities.size() > 0) { - flushBatch(); + Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credential != null) { + builder.credential(credential); } - return new DatastoreWriteResult(totalWritten); - } - @Override - public DatastoreWriteOperation getWriteOperation() { - return writeOp; + return DatastoreFactory.get().create(builder.build()); } - /** - * Writes a batch of entities to the Datastore. - * - * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES} - * times). All entities in the batch will be committed again, even if the commit was partially - * successful. If the retry limit is exceeded, the last exception from the Datastore will be - * thrown. - * - * @throws DatastoreException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.debug("Writing batch of {} entities", entities.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - - while (true) { - // Batch upsert entities. - try { - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - for (Entity entity: entities) { - commitRequest.addMutations(makeUpsert(entity)); - } - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - datastore.commit(commitRequest.build()); - // Break if the commit threw no exception. - break; - } catch (DatastoreException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), - exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - totalWritten += entities.size(); - LOG.debug("Successfully wrote {} entities", entities.size()); - entities.clear(); - } - } - - private static class DatastoreWriteResult implements Serializable { - final long entitiesWritten; - - public DatastoreWriteResult(long recordsWritten) { - this.entitiesWritten = recordsWritten; + /** Builds a Datastore {@link QuerySplitter}. */ + public QuerySplitter getQuerySplitter() { + return DatastoreHelper.getQuerySplitter(); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0361ae9/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java index 9947c60..8fa34da 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.datastore; +import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes; @@ -27,8 +28,8 @@ import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -44,11 +45,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriter; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3DatastoreFactory; import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write; +import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write.DatastoreWriterFn; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.DoFnTester; @@ -61,7 +63,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; -import com.google.common.collect.Lists; +import com.google.datastore.v1beta3.CommitRequest; import com.google.datastore.v1beta3.Entity; import com.google.datastore.v1beta3.EntityResult; import com.google.datastore.v1beta3.Key; @@ -87,7 +89,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -285,33 +286,33 @@ public class V1Beta3Test { Key key; // Complete with name, no ancestor key = makeKey("bird", "finch").build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(Write.isValidKey(key)); // Complete with id, no ancestor key = makeKey("bird", 123).build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(Write.isValidKey(key)); // Incomplete, no ancestor key = makeKey("bird").build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(Write.isValidKey(key)); // Complete with name and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", "horned").build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(Write.isValidKey(key)); // Complete with id and ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird", 123).build(); - assertTrue(DatastoreWriter.isValidKey(key)); + assertTrue(Write.isValidKey(key)); // Incomplete with ancestor key = makeKey("bird", "owl").build(); key = makeKey(key, "bird").build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(Write.isValidKey(key)); key = makeKey().build(); - assertFalse(DatastoreWriter.isValidKey(key)); + assertFalse(Write.isValidKey(key)); } /** @@ -321,35 +322,62 @@ public class V1Beta3Test { public void testAddEntitiesWithIncompleteKeys() throws Exception { Key key = makeKey("bird").build(); Entity entity = Entity.newBuilder().setKey(key).build(); - DatastoreWriter writer = new DatastoreWriter(null, mockDatastore); + DatastoreWriterFn writer = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DoFnTester<Entity, Void> doFnTester = DoFnTester.of(writer); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Entities to be written to the Datastore must have complete keys"); - writer.write(entity); + doFnTester.processBundle(entity); + } + + /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ + @Test + public void testDatatoreWriterFnWithOneBatch() throws Exception { + datastoreWriterFnTest(100); + } + + /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */ + @Test + public void testDatatoreWriterFnWithMultipleBatches() throws Exception { + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100); } /** - * Test that entities are added to the batch to update. + * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of + * write batch size. */ @Test - public void testAddingEntities() throws Exception { - List<Entity> expected = Lists.newArrayList( - Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(), - Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(), - Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build()); - - List<Entity> allEntities = Lists.newArrayList(expected); - Collections.shuffle(allEntities); - - DatastoreWriter writer = new DatastoreWriter(null, mockDatastore); - writer.open("test_id"); - for (Entity entity : allEntities) { - writer.write(entity); + public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2); + } + + // A helper method to test DatastoreWriterFn for various batch sizes. + private void datastoreWriterFnTest(int numEntities) throws Exception { + // Create the requested number of mutations. + List<Entity> entities = new ArrayList<>(numEntities); + for (int i = 0; i < numEntities; ++i) { + entities.add(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()); } - assertEquals(expected.size(), writer.entities.size()); - assertThat(writer.entities, containsInAnyOrder(expected.toArray())); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DoFnTester<Entity, Void> doFnTester = DoFnTester.of(datastoreWriter); + doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + doFnTester.processBundle(entities); + + int start = 0; + while (start < numEntities) { + int end = Math.min(numEntities, start + DATASTORE_BATCH_UPDATE_LIMIT); + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + for (Entity entity: entities.subList(start, end)) { + commitRequest.addMutations(makeUpsert(entity)); + } + // Verify all the batch requests were made with the expected entities. + verify(mockDatastore, times(1)).commit(commitRequest.build()); + start = end; + } } /**
