Query latest timestamp
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/79491ebe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/79491ebe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/79491ebe Branch: refs/heads/gearpump-runner Commit: 79491ebe04e54043ebdf5afa6be78718eae0fe15 Parents: 0fbd9c8 Author: Vikas Kedigehalli <vika...@google.com> Authored: Tue Aug 23 16:44:08 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:10 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 41 +++++++++++-- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 60 +++++++++++++++++--- 2 files changed, 88 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79491ebe/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index c7433d3..8456e02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -239,6 +239,7 @@ public class DatastoreV1 { int numSplits; try { long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace); + LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes); numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX, Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES)); } catch (Exception e) { @@ -250,6 +251,33 @@ public class DatastoreV1 { } /** + * Datastore system tables with statistics are periodically updated. This method fetches + * the latest timestamp (in microseconds) of statistics update using the {@code __Stat_Total__} + * table. + */ + private static long queryLatestStatisticsTimestamp(Datastore datastore, + @Nullable String namespace) throws DatastoreException { + Query.Builder query = Query.newBuilder(); + if (namespace == null) { + query.addKindBuilder().setName("__Stat_Total__"); + } else { + query.addKindBuilder().setName("__Stat_Ns_Total__"); + } + query.addOrder(makeOrder("timestamp", DESCENDING)); + query.setLimit(Int32Value.newBuilder().setValue(1)); + RunQueryRequest request = makeRequest(query.build(), namespace); + + RunQueryResponse response = datastore.runQuery(request); + QueryResultBatch batch = response.getBatch(); + if (batch.getEntityResultsCount() == 0) { + throw new NoSuchElementException( + "Datastore total statistics unavailable"); + } + Entity entity = batch.getEntityResults(0).getEntity(); + return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000; + } + + /** * Get the estimated size of the data returned by the given query. * * <p>Datastore provides no way to get a good estimate of how large the result of a query @@ -261,17 +289,17 @@ public class DatastoreV1 { static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) throws DatastoreException { String ourKind = query.getKind(0).getName(); + long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace); + LOG.info("Latest stats timestamp : {}", latestTimestamp); + Query.Builder queryBuilder = Query.newBuilder(); if (namespace == null) { queryBuilder.addKindBuilder().setName("__Stat_Kind__"); } else { - queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__"); + queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__"); } queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build())); - - // Get the latest statistics - queryBuilder.addOrder(makeOrder("timestamp", DESCENDING)); - queryBuilder.setLimit(Int32Value.newBuilder().setValue(1)); + queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build())); RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); @@ -547,6 +575,7 @@ public class DatastoreV1 { estimatedNumSplits = numSplits; } + LOG.info("Splitting the query into {} splits", estimatedNumSplits); List<Query> querySplits; try { querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter, @@ -866,7 +895,7 @@ public class DatastoreV1 { @FinishBundle public void finishBundle(Context c) throws Exception { - if (mutations.size() > 0) { + if (!mutations.isEmpty()) { flushBatch(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/79491ebe/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index ab1df2f..138671d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -61,6 +61,7 @@ import com.google.datastore.v1.client.Datastore; import com.google.datastore.v1.client.QuerySplitter; import com.google.protobuf.Int32Value; import java.util.ArrayList; +import java.util.Date; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -561,14 +562,23 @@ public class DatastoreV1Test { @Test public void testEstimatedSizeBytes() throws Exception { long entityBytes = 100L; + // In seconds + long timestamp = 1234L; + + RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE), + NAMESPACE); + RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE); RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + when(mockDatastore.runQuery(latestTimestampRequest)) + .thenReturn(latestTimestampResponse); when(mockDatastore.runQuery(statRequest)) .thenReturn(statResponse); assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE)); + verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); verify(mockDatastore, times(1)).runQuery(statRequest); } @@ -609,11 +619,19 @@ public class DatastoreV1Test { int numSplits = 0; int expectedNumSplits = 20; long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES; + // In seconds + long timestamp = 1234L; + + RunQueryRequest latestTimestampRequest = makeRequest(makeLatestTimestampQuery(NAMESPACE), + NAMESPACE); + RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp); // Per Kind statistics request and response - RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE); + RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE); RunQueryResponse statResponse = makeStatKindResponse(entityBytes); + when(mockDatastore.runQuery(latestTimestampRequest)) + .thenReturn(latestTimestampResponse); when(mockDatastore.runQuery(statRequest)) .thenReturn(statResponse); when(mockQuerySplitter.getSplits( @@ -629,6 +647,7 @@ public class DatastoreV1Test { verifyUniqueKeys(queries); verify(mockQuerySplitter, times(1)).getSplits( eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)); + verify(mockDatastore, times(1)).runQuery(latestTimestampRequest); verify(mockDatastore, times(1)).runQuery(statRequest); } @@ -752,7 +771,7 @@ public class DatastoreV1Test { /** Builds a per-kind statistics response with the given entity size. */ private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) { - RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder(); + RunQueryResponse.Builder statKindResponse = RunQueryResponse.newBuilder(); Entity.Builder entity = Entity.newBuilder(); entity.setKey(makeKey("dummyKind", "dummyId")); entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build()); @@ -760,24 +779,51 @@ public class DatastoreV1Test { entityResult.setEntity(entity); QueryResultBatch.Builder batch = QueryResultBatch.newBuilder(); batch.addEntityResults(entityResult); + statKindResponse.setBatch(batch); + return statKindResponse.build(); + } + + /** Builds a response of the given timestamp. */ + private static RunQueryResponse makeLatestTimestampResponse(long timestamp) { + RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder(); + Entity.Builder entity = Entity.newBuilder(); + entity.setKey(makeKey("dummyKind", "dummyId")); + entity.getMutableProperties().put("timestamp", makeValue(new Date(timestamp * 1000)).build()); + EntityResult.Builder entityResult = EntityResult.newBuilder(); + entityResult.setEntity(entity); + QueryResultBatch.Builder batch = QueryResultBatch.newBuilder(); + batch.addEntityResults(entityResult); timestampResponse.setBatch(batch); return timestampResponse.build(); } /** Builds a per-kind statistics query for the given timestamp and namespace. */ - private static Query makeStatKindQuery(String namespace) { + private static Query makeStatKindQuery(String namespace, long timestamp) { Query.Builder statQuery = Query.newBuilder(); if (namespace == null) { statQuery.addKindBuilder().setName("__Stat_Kind__"); } else { - statQuery.addKindBuilder().setName("__Ns_Stat_Kind__"); + statQuery.addKindBuilder().setName("__Stat_Ns_Kind__"); } statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build()); - statQuery.addOrder(makeOrder("timestamp", DESCENDING)); - statQuery.setLimit(Int32Value.newBuilder().setValue(1)); + statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build()); return statQuery.build(); } + /** Builds a latest timestamp statistics query. */ + private static Query makeLatestTimestampQuery(String namespace) { + Query.Builder timestampQuery = Query.newBuilder(); + if (namespace == null) { + timestampQuery.addKindBuilder().setName("__Stat_Total__"); + } else { + timestampQuery.addKindBuilder().setName("__Stat_Ns_Total__"); + } + timestampQuery.addOrder(makeOrder("timestamp", DESCENDING)); + timestampQuery.setLimit(Int32Value.newBuilder().setValue(1)); + return timestampQuery.build(); + } + + /** Generate dummy query splits. */ private List<Query> splitQuery(Query query, int numSplits) { List<Query> queries = new LinkedList<>();