clement commented on code in PR #22175:
URL: https://github.com/apache/beam/pull/22175#discussion_r917167502


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest, 
RunQueryResponse> getCallable
     protected RunQueryRequest setStartFrom(
         RunQueryRequest element, RunQueryResponse runQueryResponse) {
       StructuredQuery query = element.getStructuredQuery();
-      StructuredQuery.Builder builder;
-      List<Order> orderByList = query.getOrderByList();
-      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
-      // Before we can set the cursor to the last document name read, we need 
to explicitly add
-      // the order of "__name__ ASC" because a cursor value must map to an 
order by
-      if (orderByList.isEmpty()) {
-        builder =
-            query
-                .toBuilder()
-                .addOrderBy(
-                    Order.newBuilder()
-                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
-                        .setDirection(Direction.ASCENDING)
-                        .build())
-                .setStartAt(
-                    Cursor.newBuilder()
-                        .setBefore(false)
-                        .addValues(
-                            Value.newBuilder()
-                                
.setReferenceValue(runQueryResponse.getDocument().getName())
-                                .build()));
-      } else {
-        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
-        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
-        for (Order order : orderByList) {
-          String fieldPath = order.getField().getFieldPath();
-          Value value = fieldsMap.get(fieldPath);
-          if (value != null) {
-            cursor.addValues(value);
-          } else if ("__name__".equals(fieldPath)) {
-            cursor.addValues(
-                Value.newBuilder()
-                    
.setReferenceValue(runQueryResponse.getDocument().getName())
-                    .build());
+      StructuredQuery.Builder builder = query.toBuilder();
+      builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+      Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+      Map<String, Value> valueMap = 
runQueryResponse.getDocument().getFieldsMap();
+      for (Order order : builder.getOrderByList()) {
+        List<String> segments;
+        try {
+          segments = 
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+        } catch (IllegalArgumentException e) {
+          // Rethrow as something specific to RunQuery
+          throw new IllegalArgumentException(
+              "Could not retry query due to malformed orderBy field", 
e.getCause());
+        }
+        // __name__ is a special field and doesn't exist in valueMap

Review Comment:
   That's not actually quite right. Some legacy types in Datastore will 
translate to maps in Firestore with the `__name__` field set.
   
   But it is true that `__name__` will never be a top-level field in a document.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling, 
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+  private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+      ImmutableSet.of(
+          FieldFilter.Operator.LESS_THAN,
+          FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+          FieldFilter.Operator.GREATER_THAN,
+          FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+          FieldFilter.Operator.NOT_EQUAL,
+          FieldFilter.Operator.NOT_IN);
+  private static final ImmutableSet<UnaryFilter.Operator> 
INEQUALITY_UNARY_FILTER_OPS =
+      ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN, 
UnaryFilter.Operator.IS_NOT_NULL);
+
+  private static final String UNQUOTED_NAME_REGEX_STRING = 
"([a-zA-Z_][a-zA-Z_0-9]*)";
+  private static final String QUOTED_NAME_REGEX_STRING = 
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+  // After each segment follows a dot and more characters, or the end of the 
string.
+  private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+      Pattern.compile(
+          String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING, 
QUOTED_NAME_REGEX_STRING),
+          Pattern.DOTALL);
+
+  static List<Order> getImplicitOrderBy(StructuredQuery query) {
+    List<String> expectedImplicitOrders = new ArrayList<>();
+    if (query.hasWhere()) {
+      fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+    }
+    if (!expectedImplicitOrders.contains("__name__")) {
+      expectedImplicitOrders.add("__name__");
+    }
+    for (Order order : query.getOrderByList()) {
+      String orderField = order.getField().getFieldPath();
+      expectedImplicitOrders.remove(orderField);
+    }
+
+    List<Order> additionalOrders = new ArrayList<>();
+    if (!expectedImplicitOrders.isEmpty()) {
+      Direction lastDirection =
+          query.getOrderByCount() == 0
+              ? Direction.ASCENDING
+              : query.getOrderByList().get(query.getOrderByCount() - 
1).getDirection();
+
+      for (String field : expectedImplicitOrders) {
+        additionalOrders.add(
+            Order.newBuilder()
+                .setDirection(lastDirection)
+                
.setField(FieldReference.newBuilder().setFieldPath(field).build())
+                .build());
+      }
+    }
+
+    return additionalOrders;
+  }
+
+  private static void fillInequalityFields(Filter filter, List<String> result) 
{
+    switch (filter.getFilterTypeCase()) {
+      case FIELD_FILTER:
+        if 
(INEQUALITY_FIELD_FILTER_OPS.contains(filter.getFieldFilter().getOp())) {
+          String fieldPath = filter.getFieldFilter().getField().getFieldPath();
+          if (!result.contains(fieldPath)) {
+            result.add(fieldPath);
+          }
+        }
+        break;
+      case COMPOSITE_FILTER:
+        filter.getCompositeFilter().getFiltersList().forEach(f -> 
fillInequalityFields(f, result));
+        break;
+      case UNARY_FILTER:
+        if 
(INEQUALITY_UNARY_FILTER_OPS.contains(filter.getUnaryFilter().getOp())) {
+          String fieldPath = filter.getUnaryFilter().getField().getFieldPath();
+          if (!result.contains(fieldPath)) {
+            result.add(fieldPath);
+          }
+        }
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Nullable
+  static Value lookupDocumentValue(List<String> segments, Map<String, Value> 
valueMap) {
+    if (segments.isEmpty()) {
+      return null;
+    }
+    String field = segments.remove(0);

Review Comment:
   The fact that this (non-private) method mutates its argument is a really 
sharp corner. A couple of possible ways out of that:
   
   - make it and `resolveOrderByFieldPath` private, and expose only a single 
method that returns the `Value` for a `Document` by field-path, with the path 
given as a string.
   - pass the current index of the segment as an extra parameter



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest, 
RunQueryResponse> getCallable
     protected RunQueryRequest setStartFrom(
         RunQueryRequest element, RunQueryResponse runQueryResponse) {
       StructuredQuery query = element.getStructuredQuery();
-      StructuredQuery.Builder builder;
-      List<Order> orderByList = query.getOrderByList();
-      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
-      // Before we can set the cursor to the last document name read, we need 
to explicitly add
-      // the order of "__name__ ASC" because a cursor value must map to an 
order by
-      if (orderByList.isEmpty()) {
-        builder =
-            query
-                .toBuilder()
-                .addOrderBy(
-                    Order.newBuilder()
-                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
-                        .setDirection(Direction.ASCENDING)
-                        .build())
-                .setStartAt(
-                    Cursor.newBuilder()
-                        .setBefore(false)
-                        .addValues(
-                            Value.newBuilder()
-                                
.setReferenceValue(runQueryResponse.getDocument().getName())
-                                .build()));
-      } else {
-        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
-        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
-        for (Order order : orderByList) {
-          String fieldPath = order.getField().getFieldPath();
-          Value value = fieldsMap.get(fieldPath);
-          if (value != null) {
-            cursor.addValues(value);
-          } else if ("__name__".equals(fieldPath)) {
-            cursor.addValues(
-                Value.newBuilder()
-                    
.setReferenceValue(runQueryResponse.getDocument().getName())
-                    .build());
+      StructuredQuery.Builder builder = query.toBuilder();
+      builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+      Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+      Map<String, Value> valueMap = 
runQueryResponse.getDocument().getFieldsMap();

Review Comment:
   The document may not be set on all RunQueryResponse (see 
https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#runqueryresponse)
   
   Are the callers of `setStartFrom` ensuring that this is set in the messages 
they are passing?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest, 
RunQueryResponse> getCallable
     protected RunQueryRequest setStartFrom(
         RunQueryRequest element, RunQueryResponse runQueryResponse) {
       StructuredQuery query = element.getStructuredQuery();
-      StructuredQuery.Builder builder;
-      List<Order> orderByList = query.getOrderByList();
-      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
-      // Before we can set the cursor to the last document name read, we need 
to explicitly add
-      // the order of "__name__ ASC" because a cursor value must map to an 
order by
-      if (orderByList.isEmpty()) {
-        builder =
-            query
-                .toBuilder()
-                .addOrderBy(
-                    Order.newBuilder()
-                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
-                        .setDirection(Direction.ASCENDING)
-                        .build())
-                .setStartAt(
-                    Cursor.newBuilder()
-                        .setBefore(false)
-                        .addValues(
-                            Value.newBuilder()
-                                
.setReferenceValue(runQueryResponse.getDocument().getName())
-                                .build()));
-      } else {
-        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
-        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
-        for (Order order : orderByList) {
-          String fieldPath = order.getField().getFieldPath();
-          Value value = fieldsMap.get(fieldPath);
-          if (value != null) {
-            cursor.addValues(value);
-          } else if ("__name__".equals(fieldPath)) {
-            cursor.addValues(
-                Value.newBuilder()
-                    
.setReferenceValue(runQueryResponse.getDocument().getName())
-                    .build());
+      StructuredQuery.Builder builder = query.toBuilder();
+      builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+      Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+      Map<String, Value> valueMap = 
runQueryResponse.getDocument().getFieldsMap();
+      for (Order order : builder.getOrderByList()) {
+        List<String> segments;
+        try {
+          segments = 
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+        } catch (IllegalArgumentException e) {
+          // Rethrow as something specific to RunQuery
+          throw new IllegalArgumentException(
+              "Could not retry query due to malformed orderBy field", 
e.getCause());

Review Comment:
   This message may be a bit misleading. I assume that a call to `setStartFrom` 
is predicated on a successful initial RunQuery RPC with the query given by the 
user, so the order by *they* specified is valid, and either the segment parsing 
code or the implicit completion of the order by is wrong.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling, 
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {

Review Comment:
   This class could use unit-tests.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest, 
RunQueryResponse> getCallable
     protected RunQueryRequest setStartFrom(
         RunQueryRequest element, RunQueryResponse runQueryResponse) {
       StructuredQuery query = element.getStructuredQuery();
-      StructuredQuery.Builder builder;
-      List<Order> orderByList = query.getOrderByList();
-      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
-      // Before we can set the cursor to the last document name read, we need 
to explicitly add
-      // the order of "__name__ ASC" because a cursor value must map to an 
order by
-      if (orderByList.isEmpty()) {
-        builder =
-            query
-                .toBuilder()
-                .addOrderBy(
-                    Order.newBuilder()
-                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
-                        .setDirection(Direction.ASCENDING)
-                        .build())
-                .setStartAt(
-                    Cursor.newBuilder()
-                        .setBefore(false)
-                        .addValues(
-                            Value.newBuilder()
-                                
.setReferenceValue(runQueryResponse.getDocument().getName())
-                                .build()));
-      } else {
-        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
-        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
-        for (Order order : orderByList) {
-          String fieldPath = order.getField().getFieldPath();
-          Value value = fieldsMap.get(fieldPath);
-          if (value != null) {
-            cursor.addValues(value);
-          } else if ("__name__".equals(fieldPath)) {
-            cursor.addValues(
-                Value.newBuilder()
-                    
.setReferenceValue(runQueryResponse.getDocument().getName())
-                    .build());
+      StructuredQuery.Builder builder = query.toBuilder();
+      builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+      Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+      Map<String, Value> valueMap = 
runQueryResponse.getDocument().getFieldsMap();
+      for (Order order : builder.getOrderByList()) {
+        List<String> segments;
+        try {
+          segments = 
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+        } catch (IllegalArgumentException e) {
+          // Rethrow as something specific to RunQuery

Review Comment:
   This does not add anything useful in the error message that the strack trace 
does not already provide.
   
   Either do not rethrow, or add relevant context for debugging (for example, 
the original query + modified order by).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling, 
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+  private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+      ImmutableSet.of(
+          FieldFilter.Operator.LESS_THAN,
+          FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+          FieldFilter.Operator.GREATER_THAN,
+          FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+          FieldFilter.Operator.NOT_EQUAL,
+          FieldFilter.Operator.NOT_IN);
+  private static final ImmutableSet<UnaryFilter.Operator> 
INEQUALITY_UNARY_FILTER_OPS =
+      ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN, 
UnaryFilter.Operator.IS_NOT_NULL);
+
+  private static final String UNQUOTED_NAME_REGEX_STRING = 
"([a-zA-Z_][a-zA-Z_0-9]*)";
+  private static final String QUOTED_NAME_REGEX_STRING = 
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+  // After each segment follows a dot and more characters, or the end of the 
string.
+  private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+      Pattern.compile(
+          String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING, 
QUOTED_NAME_REGEX_STRING),
+          Pattern.DOTALL);
+
+  static List<Order> getImplicitOrderBy(StructuredQuery query) {
+    List<String> expectedImplicitOrders = new ArrayList<>();
+    if (query.hasWhere()) {
+      fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+    }
+    if (!expectedImplicitOrders.contains("__name__")) {
+      expectedImplicitOrders.add("__name__");
+    }
+    for (Order order : query.getOrderByList()) {
+      String orderField = order.getField().getFieldPath();
+      expectedImplicitOrders.remove(orderField);
+    }
+
+    List<Order> additionalOrders = new ArrayList<>();
+    if (!expectedImplicitOrders.isEmpty()) {
+      Direction lastDirection =
+          query.getOrderByCount() == 0
+              ? Direction.ASCENDING
+              : query.getOrderByList().get(query.getOrderByCount() - 
1).getDirection();
+
+      for (String field : expectedImplicitOrders) {
+        additionalOrders.add(
+            Order.newBuilder()
+                .setDirection(lastDirection)
+                
.setField(FieldReference.newBuilder().setFieldPath(field).build())
+                .build());
+      }
+    }
+
+    return additionalOrders;
+  }
+
+  private static void fillInequalityFields(Filter filter, List<String> result) 
{
+    switch (filter.getFilterTypeCase()) {
+      case FIELD_FILTER:
+        if 
(INEQUALITY_FIELD_FILTER_OPS.contains(filter.getFieldFilter().getOp())) {
+          String fieldPath = filter.getFieldFilter().getField().getFieldPath();
+          if (!result.contains(fieldPath)) {
+            result.add(fieldPath);
+          }
+        }
+        break;
+      case COMPOSITE_FILTER:
+        filter.getCompositeFilter().getFiltersList().forEach(f -> 
fillInequalityFields(f, result));
+        break;
+      case UNARY_FILTER:
+        if 
(INEQUALITY_UNARY_FILTER_OPS.contains(filter.getUnaryFilter().getOp())) {
+          String fieldPath = filter.getUnaryFilter().getField().getFieldPath();
+          if (!result.contains(fieldPath)) {
+            result.add(fieldPath);
+          }
+        }
+        break;
+      default:
+        break;
+    }
+  }

Review Comment:
   The contract of the implicit order by is that inequality fields must be 
added in field-path-order (which is lexicographical order of the segment list, 
and segments are unicode lexicographical ordering).
   
   If/when Firestore starts supporting multiple inequalities, this code will 
become silently incorrect. Either this should properly order addition to the 
order by, or this should hard-fail with inequalities on more than one field 
path.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling, 
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+  private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+      ImmutableSet.of(
+          FieldFilter.Operator.LESS_THAN,
+          FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+          FieldFilter.Operator.GREATER_THAN,
+          FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+          FieldFilter.Operator.NOT_EQUAL,
+          FieldFilter.Operator.NOT_IN);
+  private static final ImmutableSet<UnaryFilter.Operator> 
INEQUALITY_UNARY_FILTER_OPS =
+      ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN, 
UnaryFilter.Operator.IS_NOT_NULL);
+
+  private static final String UNQUOTED_NAME_REGEX_STRING = 
"([a-zA-Z_][a-zA-Z_0-9]*)";
+  private static final String QUOTED_NAME_REGEX_STRING = 
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+  // After each segment follows a dot and more characters, or the end of the 
string.
+  private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+      Pattern.compile(
+          String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING, 
QUOTED_NAME_REGEX_STRING),
+          Pattern.DOTALL);
+
+  static List<Order> getImplicitOrderBy(StructuredQuery query) {
+    List<String> expectedImplicitOrders = new ArrayList<>();
+    if (query.hasWhere()) {
+      fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+    }
+    if (!expectedImplicitOrders.contains("__name__")) {
+      expectedImplicitOrders.add("__name__");
+    }
+    for (Order order : query.getOrderByList()) {
+      String orderField = order.getField().getFieldPath();
+      expectedImplicitOrders.remove(orderField);
+    }

Review Comment:
   Equality test should be done on unescaped paths (segment lists) because the 
escaped form is not canonical.
   
   Consider the following test cases, that would not pass with the current code:
   
   ```
   ... WHERE `a` > 1 ORDER BY a  -->  ORDER BY a, __name__
   ... WHERE `a` > 1 AND a < 3  -->  ORDER BY a, __name__
   ... ORDER BY `__name__` --> ORDER BY `__name__`
   ... WHERE `__name__` > ".../foo/bar" --> ORDER BY __name__
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest, 
RunQueryResponse> getCallable
     protected RunQueryRequest setStartFrom(
         RunQueryRequest element, RunQueryResponse runQueryResponse) {
       StructuredQuery query = element.getStructuredQuery();
-      StructuredQuery.Builder builder;
-      List<Order> orderByList = query.getOrderByList();
-      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
-      // Before we can set the cursor to the last document name read, we need 
to explicitly add
-      // the order of "__name__ ASC" because a cursor value must map to an 
order by
-      if (orderByList.isEmpty()) {
-        builder =
-            query
-                .toBuilder()
-                .addOrderBy(
-                    Order.newBuilder()
-                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
-                        .setDirection(Direction.ASCENDING)
-                        .build())
-                .setStartAt(
-                    Cursor.newBuilder()
-                        .setBefore(false)
-                        .addValues(
-                            Value.newBuilder()
-                                
.setReferenceValue(runQueryResponse.getDocument().getName())
-                                .build()));
-      } else {
-        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
-        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
-        for (Order order : orderByList) {
-          String fieldPath = order.getField().getFieldPath();
-          Value value = fieldsMap.get(fieldPath);
-          if (value != null) {
-            cursor.addValues(value);
-          } else if ("__name__".equals(fieldPath)) {
-            cursor.addValues(
-                Value.newBuilder()
-                    
.setReferenceValue(runQueryResponse.getDocument().getName())
-                    .build());
+      StructuredQuery.Builder builder = query.toBuilder();
+      builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+      Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+      Map<String, Value> valueMap = 
runQueryResponse.getDocument().getFieldsMap();
+      for (Order order : builder.getOrderByList()) {
+        List<String> segments;
+        try {
+          segments = 
QueryUtils.resolveOrderByFieldPath(order.getField().getFieldPath());
+        } catch (IllegalArgumentException e) {
+          // Rethrow as something specific to RunQuery
+          throw new IllegalArgumentException(
+              "Could not retry query due to malformed orderBy field", 
e.getCause());
+        }
+        // __name__ is a special field and doesn't exist in valueMap
+        if (segments.size() == 1 && "__name__".equals(segments.get(0))) {
+          cursor.addValues(
+              Value.newBuilder()
+                  .setReferenceValue(runQueryResponse.getDocument().getName())
+                  .build());
+        } else {

Review Comment:
   Worth putting this logic in a new method 
`QueryUtils.lookupDocumentValue(segments, runQueryResponse.getDocument())` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to