chamikaramj commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r926156052
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java:
##########
@@ -477,10 +508,16 @@ private static List<Query> splitQuery(
@Nullable String namespace,
Datastore datastore,
QuerySplitter querySplitter,
- int numSplits)
+ int numSplits,
+ @Nullable Instant readTime)
throws DatastoreException {
// If namespace is set, include it in the split request so splits are
calculated accordingly.
- return querySplitter.getSplits(query, forNamespace(namespace).build(),
numSplits, datastore);
+ PartitionId partitionId = forNamespace(namespace).build();
+ if (readTime != null) {
+ Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+ return querySplitter.getSplits(query, partitionId, numSplits,
datastore, readTimeProto);
+ }
+ return querySplitter.getSplits(query, partitionId, numSplits, datastore);
Review Comment:
Just to confirm, do sub-queries maintain the same read time ?
Also, I wonder if we can implement a better splitter function here by using
different read times (but that can be separate).
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java:
##########
@@ -146,782 +158,890 @@ public void setUp() {
MetricsEnvironment.setProcessWideContainer(container);
}
- @Test
- public void testBuildRead() throws Exception {
- DatastoreV1.Read read =
-
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- }
-
- @Test
- public void testBuildReadWithGqlQuery() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withLiteralGqlQuery(GQL_QUERY)
- .withNamespace(NAMESPACE);
- assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- }
-
- /** {@link #testBuildRead} but constructed in a different order. */
- @Test
- public void testBuildReadAlt() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withQuery(QUERY)
- .withNamespace(NAMESPACE)
- .withProjectId(PROJECT_ID)
- .withLocalhost(LOCALHOST);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- assertEquals(LOCALHOST, read.getLocalhost());
- }
-
- @Test
- public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withLiteralGqlQuery(GQL_QUERY)
- .withQuery(QUERY);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("withQuery() and withLiteralGqlQuery() are
exclusive");
- read.expand(null);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit =
Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0: must be positive");
-
- DatastoreIO.v1().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit =
Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5: must be positive");
-
- DatastoreIO.v1().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadDisplayData() {
- DatastoreV1.Read read =
-
DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+ @RunWith(JUnit4.class)
+ public static class SingletonTests extends DatastoreV1Test {
+ @Test
+ public void testBuildRead() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId().get());
+ assertEquals(NAMESPACE, read.getNamespace().get());
+ }
- DisplayData displayData = DisplayData.from(read);
+ @Test
+ public void testBuildReadWithReadTime() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withReadTime(TIMESTAMP);
Review Comment:
Do existing tests fail without setting the new parameter ?
If so, could customers run into similar issues ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]