DatastoreIO SplitQueryFn integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0312f15e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0312f15e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0312f15e Branch: refs/heads/gearpump-runner Commit: 0312f15e95898f2fbd4dd4a4accfa9529f5efeee Parents: 6ee7b62 Author: Vikas Kedigehalli <vika...@google.com> Authored: Mon Aug 29 13:55:32 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:12 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 9 +- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 6 +- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 97 ++++++++++++++++++++ 3 files changed, 107 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0312f15e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 8456e02..e24bc80 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify; import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING; import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED; +import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter; import static com.google.datastore.v1.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1.client.DatastoreHelper.makeOrder; @@ -290,7 +291,7 @@ public class DatastoreV1 { throws DatastoreException { String ourKind = query.getKind(0).getName(); long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace); - LOG.info("Latest stats timestamp : {}", latestTimestamp); + LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp); Query.Builder queryBuilder = Query.newBuilder(); if (namespace == null) { @@ -298,8 +299,10 @@ public class DatastoreV1 { } else { queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__"); } - queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build())); - queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build())); + + queryBuilder.setFilter(makeAndFilter( + makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(), + makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build())); RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0312f15e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index 138671d..d96c320 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.datastore; import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL; import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter; import static com.google.datastore.v1.client.DatastoreHelper.makeDelete; import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; import static com.google.datastore.v1.client.DatastoreHelper.makeKey; @@ -805,8 +806,9 @@ public class DatastoreV1Test { } else { statQuery.addKindBuilder().setName("__Stat_Ns_Kind__"); } - statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build()); - statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build()); + statQuery.setFilter(makeAndFilter( + makeFilter("kind_name", EQUAL, makeValue(KIND).build()).build(), + makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L).build()).build())); return statQuery.build(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0312f15e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java new file mode 100644 index 0000000..72ab7c2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.datastore; + +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.NUM_QUERY_SPLITS_MIN; +import static org.junit.Assert.assertEquals; + +import com.google.datastore.v1.Query; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for {@link DatastoreV1.Read.SplitQueryFn}. + * + * <p> It is hard to mock the exact behavior of Cloud Datastore, especially for the statistics + * queries. Also the fact that DatastoreIO falls back gracefully when querying statistics fails, + * makes it hard to catch these issues in production. This test here ensures we interact with + * the Cloud Datastore directly, query the actual stats and verify that the SplitQueryFn generates + * the expected number of query splits. + * + * <p> These tests are brittle as they rely on statistics data in Cloud Datastore. If the data + * gets lost or changes then they will begin failing and this test should be disabled. + * At the time of writing, the Cloud Datastore has the following statistics, + * <ul> + * <li>kind = sort_1G, entity_bytes = 2130000000, count = 10000000 + * <li>kind = shakespeare, entity_bytes = 26383451, count = 172948 + * </ul> + */ +@RunWith(JUnit4.class) +public class SplitQueryFnIT { + /** + * Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset. + */ + @Test + public void testSplitQueryFnWithLargeDataset() throws Exception { + String projectId = "apache-beam-testing"; + String kind = "sort_1G"; + String namespace = null; + // Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by + // Datastore stats. + int expectedNumSplits = 32; + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits); + } + + /** + * Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset. + */ + @Test + public void testSplitQueryFnWithSmallDataset() throws Exception { + String projectId = "apache-beam-testing"; + String kind = "shakespeare"; + String namespace = null; + int expectedNumSplits = NUM_QUERY_SPLITS_MIN; + testSplitQueryFn(projectId, kind, namespace, expectedNumSplits); + } + + /** + * A helper method to test {@link SplitQueryFn} to generate the expected number of splits. + */ + private void testSplitQueryFn(String projectId, String kind, @Nullable String namespace, + int expectedNumSplits) throws Exception { + Query.Builder query = Query.newBuilder(); + query.addKindBuilder().setName(kind); + + SplitQueryFn splitQueryFn = new SplitQueryFn( + V1Options.from(projectId, query.build(), namespace), 0); + DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); + + List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build()); + assertEquals(queries.size(), expectedNumSplits); + } + + // TODO (vikasrk): Create datasets under a different namespace and add tests. +}