http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java deleted file mode 100644 index b0c6c18..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java +++ /dev/null @@ -1,792 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.datastore; - -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL; -import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.testing.RunnableOnService; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; - -import com.google.datastore.v1beta3.CommitRequest; -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.EntityResult; -import com.google.datastore.v1beta3.Key; -import com.google.datastore.v1beta3.Mutation; -import com.google.datastore.v1beta3.PartitionId; -import com.google.datastore.v1beta3.Query; -import com.google.datastore.v1beta3.QueryResultBatch; -import com.google.datastore.v1beta3.RunQueryRequest; -import com.google.datastore.v1beta3.RunQueryResponse; -import com.google.datastore.v1beta3.client.Datastore; -import com.google.datastore.v1beta3.client.QuerySplitter; -import com.google.protobuf.Int32Value; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -/** - * Tests for {@link V1Beta3}. - */ -@RunWith(JUnit4.class) -public class V1Beta3Test { - private static final String PROJECT_ID = "testProject"; - private static final String NAMESPACE = "testNamespace"; - private static final String KIND = "testKind"; - private static final Query QUERY; - private static final V1Beta3Options v1Beta3Options; - static { - Query.Builder q = Query.newBuilder(); - q.addKindBuilder().setName(KIND); - QUERY = q.build(); - v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE); - } - private V1Beta3.Read initialRead; - - @Mock - Datastore mockDatastore; - @Mock - QuerySplitter mockQuerySplitter; - @Mock - V1Beta3DatastoreFactory mockDatastoreFactory; - - @Rule - public final ExpectedException thrown = ExpectedException.none(); - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - - initialRead = DatastoreIO.v1beta3().read() - .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - - when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class))) - .thenReturn(mockDatastore); - when(mockDatastoreFactory.getQuerySplitter()) - .thenReturn(mockQuerySplitter); - } - - @Test - public void testBuildRead() throws Exception { - V1Beta3.Read read = DatastoreIO.v1beta3().read() - .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId()); - assertEquals(NAMESPACE, read.getNamespace()); - } - - /** - * {@link #testBuildRead} but constructed in a different order. - */ - @Test - public void testBuildReadAlt() throws Exception { - V1Beta3.Read read = DatastoreIO.v1beta3().read() - .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY); - assertEquals(QUERY, read.getQuery()); - assertEquals(PROJECT_ID, read.getProjectId()); - assertEquals(NAMESPACE, read.getNamespace()); - } - - @Test - public void testReadValidationFailsProject() throws Exception { - V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(QUERY); - thrown.expect(NullPointerException.class); - thrown.expectMessage("project"); - read.validate(null); - } - - @Test - public void testReadValidationFailsQuery() throws Exception { - V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID); - thrown.expect(NullPointerException.class); - thrown.expectMessage("query"); - read.validate(null); - } - - @Test - public void testReadValidationFailsQueryLimitZero() throws Exception { - Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Invalid query limit 0: must be positive"); - - DatastoreIO.v1beta3().read().withQuery(invalidLimit); - } - - @Test - public void testReadValidationFailsQueryLimitNegative() throws Exception { - Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build(); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Invalid query limit -5: must be positive"); - - DatastoreIO.v1beta3().read().withQuery(invalidLimit); - } - - @Test - public void testReadValidationSucceedsNamespace() throws Exception { - V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY); - /* Should succeed, as a null namespace is fine. */ - read.validate(null); - } - - @Test - public void testReadDisplayData() { - V1Beta3.Read read = DatastoreIO.v1beta3().read() - .withProjectId(PROJECT_ID) - .withQuery(QUERY) - .withNamespace(NAMESPACE); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - assertThat(displayData, hasDisplayItem("query", QUERY.toString())); - assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); - } - - @Test - @Category(RunnableOnService.class) - public void testSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId( - "myProject").withQuery(Query.newBuilder().build()); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("DatastoreIO read should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - } - - @Test - public void testWriteDoesNotAllowNullProject() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - DatastoreIO.v1beta3().write().withProjectId(null); - } - - @Test - public void testWriteValidationFailsWithNoProject() throws Exception { - Write write = DatastoreIO.v1beta3().write(); - - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - write.validate(null); - } - - @Test - public void testWriteValidationSucceedsWithProject() throws Exception { - Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); - write.validate(null); - } - - @Test - public void testWriteDisplayData() { - Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); - - DisplayData displayData = DisplayData.from(write); - - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } - - @Test - public void testDeleteEntityDoesNotAllowNullProject() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - DatastoreIO.v1beta3().deleteEntity().withProjectId(null); - } - - @Test - public void testDeleteEntityValidationFailsWithNoProject() throws Exception { - DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity(); - - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - deleteEntity.validate(null); - } - - @Test - public void testDeleteEntityValidationSucceedsWithProject() throws Exception { - DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID); - deleteEntity.validate(null); - } - - @Test - public void testDeleteEntityDisplayData() { - DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID); - - DisplayData displayData = DisplayData.from(deleteEntity); - - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } - - @Test - public void testDeleteKeyDoesNotAllowNullProject() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - DatastoreIO.v1beta3().deleteKey().withProjectId(null); - } - - @Test - public void testDeleteKeyValidationFailsWithNoProject() throws Exception { - DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey(); - - thrown.expect(NullPointerException.class); - thrown.expectMessage("projectId"); - - deleteKey.validate(null); - } - - @Test - public void testDeleteKeyValidationSucceedsWithProject() throws Exception { - DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID); - deleteKey.validate(null); - } - - @Test - public void testDeleteKeyDisplayData() { - DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID); - - DisplayData displayData = DisplayData.from(deleteKey); - - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } - - @Test - @Category(RunnableOnService.class) - public void testWritePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform<PCollection<Entity>, ?> write = - DatastoreIO.v1beta3().write().withProjectId("myProject"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("DatastoreIO write should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - assertThat("DatastoreIO write should include the upsertFn in its primitive display data", - displayData, hasItem(hasDisplayItem("upsertFn"))); - - } - - @Test - @Category(RunnableOnService.class) - public void testDeleteEntityPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform<PCollection<Entity>, ?> write = - DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("DatastoreIO write should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data", - displayData, hasItem(hasDisplayItem("deleteEntityFn"))); - - } - - @Test - @Category(RunnableOnService.class) - public void testDeleteKeyPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - PTransform<PCollection<Key>, ?> write = - DatastoreIO.v1beta3().deleteKey().withProjectId("myProject"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("DatastoreIO write should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data", - displayData, hasItem(hasDisplayItem("deleteKeyFn"))); - - } - - /** - * Test building a Write using builder methods. - */ - @Test - public void testBuildWrite() throws Exception { - V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID); - assertEquals(PROJECT_ID, write.getProjectId()); - } - - /** - * Test the detection of complete and incomplete keys. - */ - @Test - public void testHasNameOrId() { - Key key; - // Complete with name, no ancestor - key = makeKey("bird", "finch").build(); - assertTrue(isValidKey(key)); - - // Complete with id, no ancestor - key = makeKey("bird", 123).build(); - assertTrue(isValidKey(key)); - - // Incomplete, no ancestor - key = makeKey("bird").build(); - assertFalse(isValidKey(key)); - - // Complete with name and ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird", "horned").build(); - assertTrue(isValidKey(key)); - - // Complete with id and ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird", 123).build(); - assertTrue(isValidKey(key)); - - // Incomplete with ancestor - key = makeKey("bird", "owl").build(); - key = makeKey(key, "bird").build(); - assertFalse(isValidKey(key)); - - key = makeKey().build(); - assertFalse(isValidKey(key)); - } - - /** - * Test that entities with incomplete keys cannot be updated. - */ - @Test - public void testAddEntitiesWithIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - UpsertFn upsertFn = new UpsertFn(); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Entities to be written to the Datastore must have complete keys"); - - upsertFn.apply(entity); - } - - @Test - /** - * Test that entities with valid keys are transformed to upsert mutations. - */ - public void testAddEntities() throws Exception { - Key key = makeKey("bird", "finch").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - UpsertFn upsertFn = new UpsertFn(); - - Mutation exceptedMutation = makeUpsert(entity).build(); - assertEquals(upsertFn.apply(entity), exceptedMutation); - } - - /** - * Test that entities with incomplete keys cannot be deleted. - */ - @Test - public void testDeleteEntitiesWithIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys"); - - deleteEntityFn.apply(entity); - } - - /** - * Test that entities with valid keys are transformed to delete mutations. - */ - @Test - public void testDeleteEntities() throws Exception { - Key key = makeKey("bird", "finch").build(); - Entity entity = Entity.newBuilder().setKey(key).build(); - DeleteEntityFn deleteEntityFn = new DeleteEntityFn(); - - Mutation exceptedMutation = makeDelete(entity.getKey()).build(); - assertEquals(deleteEntityFn.apply(entity), exceptedMutation); - } - - /** - * Test that incomplete keys cannot be deleted. - */ - @Test - public void testDeleteIncompleteKeys() throws Exception { - Key key = makeKey("bird").build(); - DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Keys to be deleted from the Datastore must be complete"); - - deleteKeyFn.apply(key); - } - - /** - * Test that valid keys are transformed to delete mutations. - */ - @Test - public void testDeleteKeys() throws Exception { - Key key = makeKey("bird", "finch").build(); - DeleteKeyFn deleteKeyFn = new DeleteKeyFn(); - - Mutation exceptedMutation = makeDelete(key).build(); - assertEquals(deleteKeyFn.apply(key), exceptedMutation); - } - - @Test - public void testDatastoreWriteFnDisplayData() { - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID); - DisplayData displayData = DisplayData.from(datastoreWriter); - assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); - } - - /** Tests {@link DatastoreWriterFn} with entities less than one batch. */ - @Test - public void testDatatoreWriterFnWithOneBatch() throws Exception { - datastoreWriterFnTest(100); - } - - /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */ - @Test - public void testDatatoreWriterFnWithMultipleBatches() throws Exception { - datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100); - } - - /** - * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of - * write batch size. - */ - @Test - public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { - datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2); - } - - // A helper method to test DatastoreWriterFn for various batch sizes. - private void datastoreWriterFnTest(int numMutations) throws Exception { - // Create the requested number of mutations. - List<Mutation> mutations = new ArrayList<>(numMutations); - for (int i = 0; i < numMutations; ++i) { - mutations.add( - makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); - } - - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); - DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - doFnTester.processBundle(mutations); - - int start = 0; - while (start < numMutations) { - int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT); - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - commitRequest.addAllMutations(mutations.subList(start, end)); - // Verify all the batch requests were made with the expected mutations. - verify(mockDatastore, times(1)).commit(commitRequest.build()); - start = end; - } - } - - /** - * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a - * query. - */ - @Test - public void testEstimatedSizeBytes() throws Exception { - long entityBytes = 100L; - // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); - RunQueryResponse statResponse = makeStatKindResponse(entityBytes); - - when(mockDatastore.runQuery(statRequest)) - .thenReturn(statResponse); - - assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE)); - verify(mockDatastore, times(1)).runQuery(statRequest); - } - - /** - * Tests {@link SplitQueryFn} when number of query splits is specified. - */ - @Test - public void testSplitQueryFnWithNumSplits() throws Exception { - int numSplits = 100; - when(mockQuerySplitter.getSplits( - eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class))) - .thenReturn(splitQuery(QUERY, numSplits)); - - SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory); - DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); - /** - * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through - * mock factory using a when clause for unit testing purposes, it is not serializable - * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the - * doFn from being serialized. - */ - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY); - - assertEquals(queries.size(), numSplits); - verifyUniqueKeys(queries); - verify(mockQuerySplitter, times(1)).getSplits( - eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)); - verifyZeroInteractions(mockDatastore); - } - - /** - * Tests {@link SplitQueryFn} when no query splits is specified. - */ - @Test - public void testSplitQueryFnWithoutNumSplits() throws Exception { - // Force SplitQueryFn to compute the number of query splits - int numSplits = 0; - int expectedNumSplits = 20; - long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; - - // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); - RunQueryResponse statResponse = makeStatKindResponse(entityBytes); - - when(mockDatastore.runQuery(statRequest)) - .thenReturn(statResponse); - when(mockQuerySplitter.getSplits( - eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class))) - .thenReturn(splitQuery(QUERY, expectedNumSplits)); - - SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory); - DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY); - - assertEquals(queries.size(), expectedNumSplits); - verifyUniqueKeys(queries); - verify(mockQuerySplitter, times(1)).getSplits( - eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); - verify(mockDatastore, times(1)).runQuery(statRequest); - } - - /** - * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit. - */ - @Test - public void testSplitQueryFnWithQueryLimit() throws Exception { - Query queryWithLimit = QUERY.toBuilder().clone() - .setLimit(Int32Value.newBuilder().setValue(1)) - .build(); - - SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory); - DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit); - - assertEquals(queries.size(), 1); - verifyUniqueKeys(queries); - verifyNoMoreInteractions(mockDatastore); - verifyNoMoreInteractions(mockQuerySplitter); - } - - /** Tests {@link ReadFn} with a query limit less than one batch. */ - @Test - public void testReadFnWithOneBatch() throws Exception { - readFnTest(5); - } - - /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */ - @Test - public void testReadFnWithMultipleBatches() throws Exception { - readFnTest(QUERY_BATCH_LIMIT + 5); - } - - /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */ - @Test - public void testReadFnWithBatchesExactMultiple() throws Exception { - readFnTest(5 * QUERY_BATCH_LIMIT); - } - - /** Helper Methods */ - - /** A helper function that verifies if all the queries have unique keys. */ - private void verifyUniqueKeys(List<KV<Integer, Query>> queries) { - Set<Integer> keys = new HashSet<>(); - for (KV<Integer, Query> kv: queries) { - keys.add(kv.getKey()); - } - assertEquals(keys.size(), queries.size()); - } - - /** - * A helper function that creates mock {@link Entity} results in response to a query. Always - * indicates that more results are available, unless the batch is limited to fewer than - * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results. - */ - private static RunQueryResponse mockResponseForQuery(Query q) { - // Every query V1Beta3 sends should have a limit. - assertTrue(q.hasLimit()); - - // The limit should be in the range [1, QUERY_BATCH_LIMIT] - int limit = q.getLimit().getValue(); - assertThat(limit, greaterThanOrEqualTo(1)); - assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT)); - - // Create the requested number of entities. - List<EntityResult> entities = new ArrayList<>(limit); - for (int i = 0; i < limit; ++i) { - entities.add( - EntityResult.newBuilder() - .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1))) - .build()); - } - - // Fill out the other parameters on the returned result batch. - RunQueryResponse.Builder ret = RunQueryResponse.newBuilder(); - ret.getBatchBuilder() - .addAllEntityResults(entities) - .setEntityResultType(EntityResult.ResultType.FULL) - .setMoreResults( - limit == QUERY_BATCH_LIMIT - ? QueryResultBatch.MoreResultsType.NOT_FINISHED - : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS); - - return ret.build(); - } - - /** Helper function to run a test reading from a {@link ReadFn}. */ - private void readFnTest(int numEntities) throws Exception { - // An empty query to read entities. - Query query = Query.newBuilder().setLimit( - Int32Value.newBuilder().setValue(numEntities)).build(); - - // Use mockResponseForQuery to generate results. - when(mockDatastore.runQuery(any(RunQueryRequest.class))) - .thenAnswer(new Answer<RunQueryResponse>() { - @Override - public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable { - Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery(); - return mockResponseForQuery(q); - } - }); - - ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory); - DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn); - /** - * Although Datastore client is marked transient in {@link ReadFn}, when injected through - * mock factory using a when clause for unit testing purposes, it is not serializable - * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the - * test object from being serialized. - */ - doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); - List<Entity> entities = doFnTester.processBundle(query); - - int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT); - verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class)); - // Validate the number of results. - assertEquals(numEntities, entities.size()); - } - - /** Builds a per-kind statistics response with the given entity size. */ - private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) { - RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder(); - Entity.Builder entity = Entity.newBuilder(); - entity.setKey(makeKey("dummyKind", "dummyId")); - entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build()); - EntityResult.Builder entityResult = EntityResult.newBuilder(); - entityResult.setEntity(entity); - QueryResultBatch.Builder batch = QueryResultBatch.newBuilder(); - batch.addEntityResults(entityResult); - timestampResponse.setBatch(batch); - return timestampResponse.build(); - } - - /** Builds a per-kind statistics query for the given timestamp and namespace. */ - private static Query makeStatKindQuery(String namespace) { - Query.Builder statQuery = Query.newBuilder(); - if (namespace == null) { - statQuery.addKindBuilder().setName("__Stat_Kind__"); - } else { - statQuery.addKindBuilder().setName("__Ns_Stat_Kind__"); - } - statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build()); - statQuery.addOrder(makeOrder("timestamp", DESCENDING)); - statQuery.setLimit(Int32Value.newBuilder().setValue(1)); - return statQuery.build(); - } - - /** Generate dummy query splits. */ - private List<Query> splitQuery(Query query, int numSplits) { - List<Query> queries = new LinkedList<>(); - for (int i = 0; i < numSplits; i++) { - queries.add(query.toBuilder().clone().build()); - } - return queries; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java deleted file mode 100644 index 099ebe0..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.testing.TestPipelineOptions; - -import javax.annotation.Nullable; - -/** - * V1Beta3 Datastore related pipeline options. - */ -public interface V1Beta3TestOptions extends TestPipelineOptions { - @Description("Project ID to read from datastore") - @Default.String("apache-beam-testing") - String getProject(); - void setProject(String value); - - @Description("Datastore Entity kind") - @Default.String("beam-test") - String getKind(); - void setKind(String value); - - @Description("Datastore Namespace") - String getNamespace(); - void setNamespace(@Nullable String value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java deleted file mode 100644 index 7eaf23e..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert; -import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; - -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.RetryHttpRequestInitializer; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.datastore.v1beta3.CommitRequest; -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.EntityResult; -import com.google.datastore.v1beta3.Key; -import com.google.datastore.v1beta3.Key.PathElement; -import com.google.datastore.v1beta3.Mutation; -import com.google.datastore.v1beta3.PropertyFilter; -import com.google.datastore.v1beta3.Query; -import com.google.datastore.v1beta3.QueryResultBatch; -import com.google.datastore.v1beta3.RunQueryRequest; -import com.google.datastore.v1beta3.RunQueryResponse; -import com.google.datastore.v1beta3.client.Datastore; -import com.google.datastore.v1beta3.client.DatastoreException; -import com.google.datastore.v1beta3.client.DatastoreFactory; -import com.google.datastore.v1beta3.client.DatastoreOptions; -import com.google.protobuf.Int32Value; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; -import javax.annotation.Nullable; - -class V1Beta3TestUtil { - private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestUtil.class); - - /** - * A helper function to create the ancestor key for all created and queried entities. - */ - static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) { - Key.Builder keyBuilder = makeKey(kind, ancestor); - if (namespace != null) { - keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); - } - return keyBuilder.build(); - } - - /** - * Build a datastore ancestor query for the specified kind, namespace and ancestor. - */ - static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) { - Query.Builder q = Query.newBuilder(); - q.addKindBuilder().setName(kind); - q.setFilter(makeFilter( - "__key__", - PropertyFilter.Operator.HAS_ANCESTOR, - makeValue(makeAncestorKey(namespace, kind, ancestor)))); - return q.build(); - } - - /** - * Build an entity for the given ancestorKey, kind, namespace and value. - */ - static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) { - Entity.Builder entityBuilder = Entity.newBuilder(); - Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString()); - // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so - // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added, - // we can simplify this code. - if (namespace != null) { - keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); - } - - entityBuilder.setKey(keyBuilder.build()); - entityBuilder.getMutableProperties().put("value", makeValue(value).build()); - return entityBuilder.build(); - } - - /** - * A DoFn that creates entity for a long number. - */ - static class CreateEntityFn extends DoFn<Long, Entity> { - private final String kind; - @Nullable - private final String namespace; - private Key ancestorKey; - - CreateEntityFn(String kind, @Nullable String namespace, String ancestor) { - this.kind = kind; - this.namespace = namespace; - // Build the ancestor key for all created entities once, including the namespace. - ancestorKey = makeAncestorKey(namespace, kind, ancestor); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(makeEntity(c.element(), ancestorKey, kind, namespace)); - } - } - - /** - * Build a new datastore client. - */ - static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { - DatastoreOptions.Builder builder = - new DatastoreOptions.Builder() - .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } - return DatastoreFactory.get().create(builder.build()); - } - - /** - * Build a datastore query request. - */ - private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { - RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); - if (namespace != null) { - requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); - } - return requestBuilder.build(); - } - - /** - * Delete all entities with the given ancestor. - */ - static void deleteAllEntities(V1Beta3TestOptions options, String ancestor) throws Exception { - Datastore datastore = getDatastore(options, options.getProject()); - Query query = V1Beta3TestUtil.makeAncestorKindQuery( - options.getKind(), options.getNamespace(), ancestor); - - V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace()); - V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new DeleteMutationBuilder()); - - long numEntities = 0; - while (reader.advance()) { - Entity entity = reader.getCurrent(); - numEntities++; - writer.write(entity); - } - - writer.close(); - LOG.info("Successfully deleted {} entities", numEntities); - } - - /** - * Returns the total number of entities for the given datastore. - */ - static long countEntities(V1Beta3TestOptions options, String ancestor) throws Exception { - // Read from datastore. - Datastore datastore = V1Beta3TestUtil.getDatastore(options, options.getProject()); - Query query = V1Beta3TestUtil.makeAncestorKindQuery( - options.getKind(), options.getNamespace(), ancestor); - - V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace()); - - long numEntitiesRead = 0; - while (reader.advance()) { - reader.getCurrent(); - numEntitiesRead++; - } - return numEntitiesRead; - } - - /** - * An interface to represent any datastore mutation operation. - * Mutation operations include insert, delete, upsert, update. - */ - interface MutationBuilder { - Mutation.Builder apply(Entity entity); - } - - /** - *A MutationBuilder that performs upsert operation. - */ - static class UpsertMutationBuilder implements MutationBuilder { - public Mutation.Builder apply(Entity entity) { - return makeUpsert(entity); - } - } - - /** - * A MutationBuilder that performs delete operation. - */ - static class DeleteMutationBuilder implements MutationBuilder { - public Mutation.Builder apply(Entity entity) { - return makeDelete(entity.getKey()); - } - } - - /** - * A helper class to write entities to datastore. - */ - static class V1Beta3TestWriter { - private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestWriter.class); - // Limits the number of entities updated per batch - private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; - // Number of times to retry on update failure - private static final int MAX_RETRIES = 5; - //Initial backoff time for exponential backoff for retry attempts. - private static final int INITIAL_BACKOFF_MILLIS = 5000; - - // Returns true if a Datastore key is complete. A key is complete if its last element - // has either an id or a name. - static boolean isValidKey(Key key) { - List<PathElement> elementList = key.getPathList(); - if (elementList.isEmpty()) { - return false; - } - PathElement lastElement = elementList.get(elementList.size() - 1); - return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); - } - - private final Datastore datastore; - private final MutationBuilder mutationBuilder; - private final List<Entity> entities = new ArrayList<>(); - - V1Beta3TestWriter(Datastore datastore, MutationBuilder mutationBuilder) { - this.datastore = datastore; - this.mutationBuilder = mutationBuilder; - } - - void write(Entity value) throws Exception { - // Verify that the entity to write has a complete key. - if (!isValidKey(value.getKey())) { - throw new IllegalArgumentException( - "Entities to be written to the Datastore must have complete keys"); - } - - entities.add(value); - - if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) { - flushBatch(); - } - } - - void close() throws Exception { - // flush any remaining entities - if (entities.size() > 0) { - flushBatch(); - } - } - - // commit the list of entities to datastore - private void flushBatch() throws DatastoreException, IOException, InterruptedException { - LOG.info("Writing batch of {} entities", entities.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); - - while (true) { - // Batch mutate entities. - try { - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - for (Entity entity: entities) { - commitRequest.addMutations(mutationBuilder.apply(entity)); - } - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - datastore.commit(commitRequest.build()); - // Break if the commit threw no exception. - break; - } catch (DatastoreException exception) { - LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), - exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - LOG.info("Successfully wrote {} entities", entities.size()); - entities.clear(); - } - } - - /** - * A helper class to read entities from datastore. - */ - static class V1Beta3TestReader { - private static final int QUERY_BATCH_LIMIT = 500; - private final Datastore datastore; - private final Query query; - @Nullable - private final String namespace; - private boolean moreResults; - private java.util.Iterator<EntityResult> entities; - // Current batch of query results - private QueryResultBatch currentBatch; - private Entity currentEntity; - - V1Beta3TestReader(Datastore datastore, Query query, @Nullable String namespace) { - this.datastore = datastore; - this.query = query; - this.namespace = namespace; - } - - Entity getCurrent() { - return currentEntity; - } - - boolean advance() throws IOException { - if (entities == null || (!entities.hasNext() && moreResults)) { - try { - entities = getIteratorAndMoveCursor(); - } catch (DatastoreException e) { - throw new IOException(e); - } - } - - if (entities == null || !entities.hasNext()) { - currentEntity = null; - return false; - } - - currentEntity = entities.next().getEntity(); - return true; - } - - // Read the next batch of query results. - private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException { - Query.Builder query = this.query.toBuilder().clone(); - query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT)); - if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { - query.setStartCursor(currentBatch.getEndCursor()); - } - - RunQueryRequest request = makeRequest(query.build(), namespace); - RunQueryResponse response = datastore.runQuery(request); - - currentBatch = response.getBatch(); - - int numFetch = currentBatch.getEntityResultsCount(); - // All indications from the API are that there are/may be more results. - moreResults = ((numFetch == QUERY_BATCH_LIMIT) - || (currentBatch.getMoreResults() == NOT_FINISHED)); - - // May receive a batch of 0 results if the number of records is a multiple - // of the request limit. - if (numFetch == 0) { - return null; - } - - return currentBatch.getEntityResultsList().iterator(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java deleted file mode 100644 index 782065f..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.countEntities; -import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities; -import static org.junit.Assert.assertEquals; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.CreateEntityFn; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.UUID; - -/** - * End-to-end tests for Datastore V1Beta3.Write. - */ -@RunWith(JUnit4.class) -public class V1Beta3WriteIT { - private V1Beta3TestOptions options; - private String ancestor; - private final long numEntities = 1000; - - @Before - public void setup() { - PipelineOptionsFactory.register(V1Beta3TestOptions.class); - options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class); - ancestor = UUID.randomUUID().toString(); - } - - /** - * An end-to-end test for {@link V1Beta3.Write}. - * - * Write some test entities to datastore through a dataflow pipeline. - * Read and count all the entities. Verify that the count matches the - * number of entities written. - */ - @Test - public void testE2EV1Beta3Write() throws Exception { - Pipeline p = Pipeline.create(options); - - // Write to datastore - p.apply(CountingInput.upTo(numEntities)) - .apply(ParDo.of(new CreateEntityFn( - options.getKind(), options.getNamespace(), ancestor))) - .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject())); - - p.run(); - - // Count number of entities written to datastore. - long numEntitiesWritten = countEntities(options, ancestor); - - assertEquals(numEntitiesWritten, numEntities); - } - - @After - public void tearDown() throws Exception { - deleteAllEntities(options, ancestor); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java new file mode 100644 index 0000000..8fedc77 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.datastore; + +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities; +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore; +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey; +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder; +import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.values.PCollection; + +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.client.Datastore; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.UUID; + +/** + * End-to-end tests for Datastore DatastoreV1.Read. + */ +@RunWith(JUnit4.class) +public class V1ReadIT { + private V1TestOptions options; + private String ancestor; + private final long numEntities = 1000; + + @Before + public void setup() { + PipelineOptionsFactory.register(V1TestOptions.class); + options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class); + ancestor = UUID.randomUUID().toString(); + } + + /** + * An end-to-end test for {@link DatastoreV1.Read}. + * + * Write some test entities to datastore and then run a dataflow pipeline that + * reads and counts the total number of entities. Verify that the count matches the + * number of entities written. + */ + @Test + public void testE2EV1Read() throws Exception { + // Create entities and write them to datastore + writeEntitiesToDatastore(options, ancestor, numEntities); + + // Read from datastore + Query query = V1TestUtil.makeAncestorKindQuery( + options.getKind(), options.getNamespace(), ancestor); + + DatastoreV1.Read read = DatastoreIO.v1().read() + .withProjectId(options.getProject()) + .withQuery(query) + .withNamespace(options.getNamespace()); + + // Count the total number of entities + Pipeline p = Pipeline.create(options); + PCollection<Long> count = p + .apply(read) + .apply(Count.<Entity>globally()); + + PAssert.thatSingleton(count).isEqualTo(numEntities); + p.run(); + } + + // Creates entities and write them to datastore + private static void writeEntitiesToDatastore(V1TestOptions options, String ancestor, + long numEntities) throws Exception { + Datastore datastore = getDatastore(options, options.getProject()); + // Write test entities to datastore + V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder()); + Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor); + + for (long i = 0; i < numEntities; i++) { + Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace()); + writer.write(entity); + } + writer.close(); + } + + @After + public void tearDown() throws Exception { + deleteAllEntities(options, ancestor); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java new file mode 100644 index 0000000..360855f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.datastore; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +import javax.annotation.Nullable; + +/** + * DatastoreV1 Datastore related pipeline options. + */ +public interface V1TestOptions extends TestPipelineOptions { + @Description("Project ID to read from datastore") + @Default.String("apache-beam-testing") + String getProject(); + void setProject(String value); + + @Description("Datastore Entity kind") + @Default.String("beam-test") + String getKind(); + void setKind(String value); + + @Description("Datastore Namespace") + String getNamespace(); + void setNamespace(@Nullable String value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java new file mode 100644 index 0000000..1e323ec --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.datastore; + +import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED; +import static com.google.datastore.v1.client.DatastoreHelper.makeDelete; +import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeKey; +import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; + +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.datastore.v1.CommitRequest; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.Key.PathElement; +import com.google.datastore.v1.Mutation; +import com.google.datastore.v1.PropertyFilter; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.RunQueryRequest; +import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.client.Datastore; +import com.google.datastore.v1.client.DatastoreException; +import com.google.datastore.v1.client.DatastoreFactory; +import com.google.datastore.v1.client.DatastoreOptions; +import com.google.protobuf.Int32Value; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import javax.annotation.Nullable; + +class V1TestUtil { + private static final Logger LOG = LoggerFactory.getLogger(V1TestUtil.class); + + /** + * A helper function to create the ancestor key for all created and queried entities. + */ + static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) { + Key.Builder keyBuilder = makeKey(kind, ancestor); + if (namespace != null) { + keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); + } + return keyBuilder.build(); + } + + /** + * Build a datastore ancestor query for the specified kind, namespace and ancestor. + */ + static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) { + Query.Builder q = Query.newBuilder(); + q.addKindBuilder().setName(kind); + q.setFilter(makeFilter( + "__key__", + PropertyFilter.Operator.HAS_ANCESTOR, + makeValue(makeAncestorKey(namespace, kind, ancestor)))); + return q.build(); + } + + /** + * Build an entity for the given ancestorKey, kind, namespace and value. + */ + static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) { + Entity.Builder entityBuilder = Entity.newBuilder(); + Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString()); + // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so + // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added, + // we can simplify this code. + if (namespace != null) { + keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace); + } + + entityBuilder.setKey(keyBuilder.build()); + entityBuilder.getMutableProperties().put("value", makeValue(value).build()); + return entityBuilder.build(); + } + + /** + * A DoFn that creates entity for a long number. + */ + static class CreateEntityFn extends DoFn<Long, Entity> { + private final String kind; + @Nullable + private final String namespace; + private Key ancestorKey; + + CreateEntityFn(String kind, @Nullable String namespace, String ancestor) { + this.kind = kind; + this.namespace = namespace; + // Build the ancestor key for all created entities once, including the namespace. + ancestorKey = makeAncestorKey(namespace, kind, ancestor); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(makeEntity(c.element(), ancestorKey, kind, namespace)); + } + } + + /** + * Build a new datastore client. + */ + static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder() + .projectId(projectId) + .initializer( + new RetryHttpRequestInitializer() + ); + + Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credential != null) { + builder.credential(credential); + } + return DatastoreFactory.get().create(builder.build()); + } + + /** + * Build a datastore query request. + */ + private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { + RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query); + if (namespace != null) { + requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace); + } + return requestBuilder.build(); + } + + /** + * Delete all entities with the given ancestor. + */ + static void deleteAllEntities(V1TestOptions options, String ancestor) throws Exception { + Datastore datastore = getDatastore(options, options.getProject()); + Query query = V1TestUtil.makeAncestorKindQuery( + options.getKind(), options.getNamespace(), ancestor); + + V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace()); + V1TestWriter writer = new V1TestWriter(datastore, new DeleteMutationBuilder()); + + long numEntities = 0; + while (reader.advance()) { + Entity entity = reader.getCurrent(); + numEntities++; + writer.write(entity); + } + + writer.close(); + LOG.info("Successfully deleted {} entities", numEntities); + } + + /** + * Returns the total number of entities for the given datastore. + */ + static long countEntities(V1TestOptions options, String ancestor) throws Exception { + // Read from datastore. + Datastore datastore = V1TestUtil.getDatastore(options, options.getProject()); + Query query = V1TestUtil.makeAncestorKindQuery( + options.getKind(), options.getNamespace(), ancestor); + + V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace()); + + long numEntitiesRead = 0; + while (reader.advance()) { + reader.getCurrent(); + numEntitiesRead++; + } + return numEntitiesRead; + } + + /** + * An interface to represent any datastore mutation operation. + * Mutation operations include insert, delete, upsert, update. + */ + interface MutationBuilder { + Mutation.Builder apply(Entity entity); + } + + /** + *A MutationBuilder that performs upsert operation. + */ + static class UpsertMutationBuilder implements MutationBuilder { + public Mutation.Builder apply(Entity entity) { + return makeUpsert(entity); + } + } + + /** + * A MutationBuilder that performs delete operation. + */ + static class DeleteMutationBuilder implements MutationBuilder { + public Mutation.Builder apply(Entity entity) { + return makeDelete(entity.getKey()); + } + } + + /** + * A helper class to write entities to datastore. + */ + static class V1TestWriter { + private static final Logger LOG = LoggerFactory.getLogger(V1TestWriter.class); + // Limits the number of entities updated per batch + private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; + // Number of times to retry on update failure + private static final int MAX_RETRIES = 5; + //Initial backoff time for exponential backoff for retry attempts. + private static final int INITIAL_BACKOFF_MILLIS = 5000; + + // Returns true if a Datastore key is complete. A key is complete if its last element + // has either an id or a name. + static boolean isValidKey(Key key) { + List<PathElement> elementList = key.getPathList(); + if (elementList.isEmpty()) { + return false; + } + PathElement lastElement = elementList.get(elementList.size() - 1); + return (lastElement.getId() != 0 || !lastElement.getName().isEmpty()); + } + + private final Datastore datastore; + private final MutationBuilder mutationBuilder; + private final List<Entity> entities = new ArrayList<>(); + + V1TestWriter(Datastore datastore, MutationBuilder mutationBuilder) { + this.datastore = datastore; + this.mutationBuilder = mutationBuilder; + } + + void write(Entity value) throws Exception { + // Verify that the entity to write has a complete key. + if (!isValidKey(value.getKey())) { + throw new IllegalArgumentException( + "Entities to be written to the Datastore must have complete keys"); + } + + entities.add(value); + + if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) { + flushBatch(); + } + } + + void close() throws Exception { + // flush any remaining entities + if (entities.size() > 0) { + flushBatch(); + } + } + + // commit the list of entities to datastore + private void flushBatch() throws DatastoreException, IOException, InterruptedException { + LOG.info("Writing batch of {} entities", entities.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); + + while (true) { + // Batch mutate entities. + try { + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + for (Entity entity: entities) { + commitRequest.addMutations(mutationBuilder.apply(entity)); + } + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); + datastore.commit(commitRequest.build()); + // Break if the commit threw no exception. + break; + } catch (DatastoreException exception) { + LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), + exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.info("Successfully wrote {} entities", entities.size()); + entities.clear(); + } + } + + /** + * A helper class to read entities from datastore. + */ + static class V1TestReader { + private static final int QUERY_BATCH_LIMIT = 500; + private final Datastore datastore; + private final Query query; + @Nullable + private final String namespace; + private boolean moreResults; + private java.util.Iterator<EntityResult> entities; + // Current batch of query results + private QueryResultBatch currentBatch; + private Entity currentEntity; + + V1TestReader(Datastore datastore, Query query, @Nullable String namespace) { + this.datastore = datastore; + this.query = query; + this.namespace = namespace; + } + + Entity getCurrent() { + return currentEntity; + } + + boolean advance() throws IOException { + if (entities == null || (!entities.hasNext() && moreResults)) { + try { + entities = getIteratorAndMoveCursor(); + } catch (DatastoreException e) { + throw new IOException(e); + } + } + + if (entities == null || !entities.hasNext()) { + currentEntity = null; + return false; + } + + currentEntity = entities.next().getEntity(); + return true; + } + + // Read the next batch of query results. + private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException { + Query.Builder query = this.query.toBuilder().clone(); + query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT)); + if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { + query.setStartCursor(currentBatch.getEndCursor()); + } + + RunQueryRequest request = makeRequest(query.build(), namespace); + RunQueryResponse response = datastore.runQuery(request); + + currentBatch = response.getBatch(); + + int numFetch = currentBatch.getEntityResultsCount(); + // All indications from the API are that there are/may be more results. + moreResults = ((numFetch == QUERY_BATCH_LIMIT) + || (currentBatch.getMoreResults() == NOT_FINISHED)); + + // May receive a batch of 0 results if the number of records is a multiple + // of the request limit. + if (numFetch == 0) { + return null; + } + + return currentBatch.getEntityResultsList().iterator(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java new file mode 100644 index 0000000..b97c05c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.datastore; + +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities; +import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities; +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.UUID; + +/** + * End-to-end tests for Datastore DatastoreV1.Write. + */ +@RunWith(JUnit4.class) +public class V1WriteIT { + private V1TestOptions options; + private String ancestor; + private final long numEntities = 1000; + + @Before + public void setup() { + PipelineOptionsFactory.register(V1TestOptions.class); + options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class); + ancestor = UUID.randomUUID().toString(); + } + + /** + * An end-to-end test for {@link DatastoreV1.Write}. + * + * Write some test entities to datastore through a dataflow pipeline. + * Read and count all the entities. Verify that the count matches the + * number of entities written. + */ + @Test + public void testE2EV1Write() throws Exception { + Pipeline p = Pipeline.create(options); + + // Write to datastore + p.apply(CountingInput.upTo(numEntities)) + .apply(ParDo.of(new CreateEntityFn( + options.getKind(), options.getNamespace(), ancestor))) + .apply(DatastoreIO.v1().write().withProjectId(options.getProject())); + + p.run(); + + // Count number of entities written to datastore. + long numEntitiesWritten = countEntities(options, ancestor); + + assertEquals(numEntitiesWritten, numEntities); + } + + @After + public void tearDown() throws Exception { + deleteAllEntities(options, ancestor); + } +}
