clement commented on code in PR #22175:
URL: https://github.com/apache/beam/pull/22175#discussion_r917167502
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : builder.getOrderByList()) {
+ List<String> segments;
+ try {
+ segments =
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+ } catch (IllegalArgumentException e) {
+ // Rethrow as something specific to RunQuery
+ throw new IllegalArgumentException(
+ "Could not retry query due to malformed orderBy field",
e.getCause());
+ }
+ // __name__ is a special field and doesn't exist in valueMap
Review Comment:
That's not actually quite right. Some legacy types in Datastore will
translate to maps in Firestore with the `__name__` field set.
But it is true that `__name__` will never be a top-level field in a document.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling,
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+ private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+ ImmutableSet.of(
+ FieldFilter.Operator.LESS_THAN,
+ FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+ FieldFilter.Operator.GREATER_THAN,
+ FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+ FieldFilter.Operator.NOT_EQUAL,
+ FieldFilter.Operator.NOT_IN);
+ private static final ImmutableSet<UnaryFilter.Operator>
INEQUALITY_UNARY_FILTER_OPS =
+ ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN,
UnaryFilter.Operator.IS_NOT_NULL);
+
+ private static final String UNQUOTED_NAME_REGEX_STRING =
"([a-zA-Z_][a-zA-Z_0-9]*)";
+ private static final String QUOTED_NAME_REGEX_STRING =
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+ // After each segment follows a dot and more characters, or the end of the
string.
+ private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+ Pattern.compile(
+ String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING,
QUOTED_NAME_REGEX_STRING),
+ Pattern.DOTALL);
+
+ static List<Order> getImplicitOrderBy(StructuredQuery query) {
+ List<String> expectedImplicitOrders = new ArrayList<>();
+ if (query.hasWhere()) {
+ fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+ }
+ if (!expectedImplicitOrders.contains("__name__")) {
+ expectedImplicitOrders.add("__name__");
+ }
+ for (Order order : query.getOrderByList()) {
+ String orderField = order.getField().getFieldPath();
+ expectedImplicitOrders.remove(orderField);
+ }
+
+ List<Order> additionalOrders = new ArrayList<>();
+ if (!expectedImplicitOrders.isEmpty()) {
+ Direction lastDirection =
+ query.getOrderByCount() == 0
+ ? Direction.ASCENDING
+ : query.getOrderByList().get(query.getOrderByCount() -
1).getDirection();
+
+ for (String field : expectedImplicitOrders) {
+ additionalOrders.add(
+ Order.newBuilder()
+ .setDirection(lastDirection)
+
.setField(FieldReference.newBuilder().setFieldPath(field).build())
+ .build());
+ }
+ }
+
+ return additionalOrders;
+ }
+
+ private static void fillInequalityFields(Filter filter, List<String> result)
{
+ switch (filter.getFilterTypeCase()) {
+ case FIELD_FILTER:
+ if
(INEQUALITY_FIELD_FILTER_OPS.contains(filter.getFieldFilter().getOp())) {
+ String fieldPath = filter.getFieldFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ case COMPOSITE_FILTER:
+ filter.getCompositeFilter().getFiltersList().forEach(f ->
fillInequalityFields(f, result));
+ break;
+ case UNARY_FILTER:
+ if
(INEQUALITY_UNARY_FILTER_OPS.contains(filter.getUnaryFilter().getOp())) {
+ String fieldPath = filter.getUnaryFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Nullable
+ static Value lookupDocumentValue(List<String> segments, Map<String, Value>
valueMap) {
+ if (segments.isEmpty()) {
+ return null;
+ }
+ String field = segments.remove(0);
Review Comment:
The fact that this (non-private) method mutates its argument is a really
sharp corner. A couple of possible ways out of that:
- make it and `resolveOrderByFieldPath` private, and expose only a single
method that returns the `Value` for a `Document` by field-path, with the path
given as a string.
- pass the current index of the segment as an extra parameter
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
Review Comment:
The document may not be set on all RunQueryResponse (see
https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#runqueryresponse)
Are the callers of `setStartFrom` ensuring that this is set in the messages
they are passing?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : builder.getOrderByList()) {
+ List<String> segments;
+ try {
+ segments =
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+ } catch (IllegalArgumentException e) {
+ // Rethrow as something specific to RunQuery
+ throw new IllegalArgumentException(
+ "Could not retry query due to malformed orderBy field",
e.getCause());
Review Comment:
This message may be a bit misleading. I assume that a call to `setStartFrom`
is predicated on a successful initial RunQuery RPC with the query given by the
user, so the order by *they* specified is valid, and either the segment parsing
code or the implicit completion of the order by is wrong.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling,
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
Review Comment:
This class could use unit-tests.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : builder.getOrderByList()) {
+ List<String> segments;
+ try {
+ segments =
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+ } catch (IllegalArgumentException e) {
+ // Rethrow as something specific to RunQuery
Review Comment:
This does not add anything useful in the error message that the strack trace
does not already provide.
Either do not rethrow, or add relevant context for debugging (for example,
the original query + modified order by).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling,
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+ private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+ ImmutableSet.of(
+ FieldFilter.Operator.LESS_THAN,
+ FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+ FieldFilter.Operator.GREATER_THAN,
+ FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+ FieldFilter.Operator.NOT_EQUAL,
+ FieldFilter.Operator.NOT_IN);
+ private static final ImmutableSet<UnaryFilter.Operator>
INEQUALITY_UNARY_FILTER_OPS =
+ ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN,
UnaryFilter.Operator.IS_NOT_NULL);
+
+ private static final String UNQUOTED_NAME_REGEX_STRING =
"([a-zA-Z_][a-zA-Z_0-9]*)";
+ private static final String QUOTED_NAME_REGEX_STRING =
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+ // After each segment follows a dot and more characters, or the end of the
string.
+ private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+ Pattern.compile(
+ String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING,
QUOTED_NAME_REGEX_STRING),
+ Pattern.DOTALL);
+
+ static List<Order> getImplicitOrderBy(StructuredQuery query) {
+ List<String> expectedImplicitOrders = new ArrayList<>();
+ if (query.hasWhere()) {
+ fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+ }
+ if (!expectedImplicitOrders.contains("__name__")) {
+ expectedImplicitOrders.add("__name__");
+ }
+ for (Order order : query.getOrderByList()) {
+ String orderField = order.getField().getFieldPath();
+ expectedImplicitOrders.remove(orderField);
+ }
+
+ List<Order> additionalOrders = new ArrayList<>();
+ if (!expectedImplicitOrders.isEmpty()) {
+ Direction lastDirection =
+ query.getOrderByCount() == 0
+ ? Direction.ASCENDING
+ : query.getOrderByList().get(query.getOrderByCount() -
1).getDirection();
+
+ for (String field : expectedImplicitOrders) {
+ additionalOrders.add(
+ Order.newBuilder()
+ .setDirection(lastDirection)
+
.setField(FieldReference.newBuilder().setFieldPath(field).build())
+ .build());
+ }
+ }
+
+ return additionalOrders;
+ }
+
+ private static void fillInequalityFields(Filter filter, List<String> result)
{
+ switch (filter.getFilterTypeCase()) {
+ case FIELD_FILTER:
+ if
(INEQUALITY_FIELD_FILTER_OPS.contains(filter.getFieldFilter().getOp())) {
+ String fieldPath = filter.getFieldFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ case COMPOSITE_FILTER:
+ filter.getCompositeFilter().getFiltersList().forEach(f ->
fillInequalityFields(f, result));
+ break;
+ case UNARY_FILTER:
+ if
(INEQUALITY_UNARY_FILTER_OPS.contains(filter.getUnaryFilter().getOp())) {
+ String fieldPath = filter.getUnaryFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
Review Comment:
The contract of the implicit order by is that inequality fields must be
added in field-path-order (which is lexicographical order of the segment list,
and segments are unicode lexicographical ordering).
If/when Firestore starts supporting multiple inequalities, this code will
become silently incorrect. Either this should properly order addition to the
order by, or this should hard-fail with inequalities on more than one field
path.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling,
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+ private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+ ImmutableSet.of(
+ FieldFilter.Operator.LESS_THAN,
+ FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+ FieldFilter.Operator.GREATER_THAN,
+ FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+ FieldFilter.Operator.NOT_EQUAL,
+ FieldFilter.Operator.NOT_IN);
+ private static final ImmutableSet<UnaryFilter.Operator>
INEQUALITY_UNARY_FILTER_OPS =
+ ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN,
UnaryFilter.Operator.IS_NOT_NULL);
+
+ private static final String UNQUOTED_NAME_REGEX_STRING =
"([a-zA-Z_][a-zA-Z_0-9]*)";
+ private static final String QUOTED_NAME_REGEX_STRING =
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+ // After each segment follows a dot and more characters, or the end of the
string.
+ private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+ Pattern.compile(
+ String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING,
QUOTED_NAME_REGEX_STRING),
+ Pattern.DOTALL);
+
+ static List<Order> getImplicitOrderBy(StructuredQuery query) {
+ List<String> expectedImplicitOrders = new ArrayList<>();
+ if (query.hasWhere()) {
+ fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+ }
+ if (!expectedImplicitOrders.contains("__name__")) {
+ expectedImplicitOrders.add("__name__");
+ }
+ for (Order order : query.getOrderByList()) {
+ String orderField = order.getField().getFieldPath();
+ expectedImplicitOrders.remove(orderField);
+ }
Review Comment:
Equality test should be done on unescaped paths (segment lists) because the
escaped form is not canonical.
Consider the following test cases, that would not pass with the current code:
```
... WHERE `a` > 1 ORDER BY a --> ORDER BY a, __name__
... WHERE `a` > 1 AND a < 3 --> ORDER BY a, __name__
... ORDER BY `__name__` --> ORDER BY `__name__`
... WHERE `__name__` > ".../foo/bar" --> ORDER BY __name__
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : builder.getOrderByList()) {
+ List<String> segments;
+ try {
+ segments =
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+ } catch (IllegalArgumentException e) {
+ // Rethrow as something specific to RunQuery
+ throw new IllegalArgumentException(
+ "Could not retry query due to malformed orderBy field",
e.getCause());
+ }
+ // __name__ is a special field and doesn't exist in valueMap
+ if (segments.size() == 1 && "__name__".equals(segments.get(0))) {
+ cursor.addValues(
+ Value.newBuilder()
+ .setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ } else {
Review Comment:
Worth putting this logic in a new method
`QueryUtils.lookupDocumentValue(segments, runQueryResponse.getDocument())` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]