Repository: beam Updated Branches: refs/heads/master ce00d2466 -> 9a6baefcd
End-to-end test for large entity writes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dcf40564 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dcf40564 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dcf40564 Branch: refs/heads/master Commit: dcf405644e2b51303e1d2c12592fe82ee01eb32f Parents: de95c7f Author: Colin Phipps <[email protected]> Authored: Tue May 9 09:40:50 2017 +0000 Committer: Eugene Kirpichov <[email protected]> Committed: Fri May 19 13:11:17 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/V1ReadIT.java | 2 +- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 15 ++++++-- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 36 ++++++++++++++++++-- 3 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java index ec7fa8f..22945f5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java @@ -148,7 +148,7 @@ public class V1ReadIT { Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor); for (long i = 0; i < numEntities; i++) { - Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace()); + Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0); writer.write(entity); } writer.close(); http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index dc91638..5e618df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -92,8 +92,10 @@ class V1TestUtil { /** * Build an entity for the given ancestorKey, kind, namespace and value. + * @param largePropertySize if greater than 0, add an unindexed property of the given size. */ - static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) { + static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace, + int largePropertySize) { Entity.Builder entityBuilder = Entity.newBuilder(); Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString()); // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so @@ -105,6 +107,10 @@ class V1TestUtil { entityBuilder.setKey(keyBuilder.build()); entityBuilder.putProperties("value", makeValue(value).build()); + if (largePropertySize > 0) { + entityBuilder.putProperties("unindexed_value", makeValue(new String( + new char[largePropertySize]).replace("\0", "A")).setExcludeFromIndexes(true).build()); + } return entityBuilder.build(); } @@ -115,18 +121,21 @@ class V1TestUtil { private final String kind; @Nullable private final String namespace; + private final int largePropertySize; private Key ancestorKey; - CreateEntityFn(String kind, @Nullable String namespace, String ancestor) { + CreateEntityFn(String kind, @Nullable String namespace, String ancestor, + int largePropertySize) { this.kind = kind; this.namespace = namespace; + this.largePropertySize = largePropertySize; // Build the ancestor key for all created entities once, including the namespace. ancestorKey = makeAncestorKey(namespace, kind, ancestor); } @ProcessElement public void processElement(ProcessContext c) throws Exception { - c.output(makeEntity(c.element(), ancestorKey, kind, namespace)); + c.output(makeEntity(c.element(), ancestorKey, kind, namespace, largePropertySize)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/dcf40564/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java index 82e4d64..4a874fd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java @@ -67,8 +67,7 @@ public class V1WriteIT { // Write to datastore p.apply(GenerateSequence.from(0).to(numEntities)) - .apply(ParDo.of(new CreateEntityFn( - options.getKind(), options.getNamespace(), ancestor))) + .apply(ParDo.of(new CreateEntityFn(options.getKind(), options.getNamespace(), ancestor, 0))) .apply(DatastoreIO.v1().write().withProjectId(project)); p.run(); @@ -79,6 +78,39 @@ public class V1WriteIT { assertEquals(numEntitiesWritten, numEntities); } + /** + * An end-to-end test for {@link DatastoreV1.Write}. + * + * <p>Write some large test entities to Cloud Datastore, to test that a batch is flushed when + * the byte size limit is reached. Read and count all the entities. Verify that the count matches + * the number of entities written. + */ + @Test + public void testE2EV1WriteWithLargeEntities() throws Exception { + Pipeline p = Pipeline.create(options); + + /* + * Datastore has a limit of 1MB per entity, and 10MB per write RPC. If each entity is around + * 1MB in size, then we hit the limit on the size of the write long before we hit the limit on + * the number of entities per writes. + */ + final int rawPropertySize = 900_000; + final int numLargeEntities = 100; + + // Write to datastore + p.apply(GenerateSequence.from(0).to(numLargeEntities)) + .apply(ParDo.of(new CreateEntityFn( + options.getKind(), options.getNamespace(), ancestor, rawPropertySize))) + .apply(DatastoreIO.v1().write().withProjectId(project)); + + p.run(); + + // Count number of entities written to datastore. + long numEntitiesWritten = countEntities(options, project, ancestor); + + assertEquals(numEntitiesWritten, numLargeEntities); + } + @After public void tearDown() throws Exception { deleteAllEntities(options, project, ancestor);
