clement commented on code in PR #22175:
URL: https://github.com/apache/beam/pull/22175#discussion_r917203520
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -151,7 +169,7 @@ static List<String> resolveOrderByFieldPath(String
fieldPath) {
// Unquoted group is null, use quoted group.
if (fieldName == null) {
fieldName = segmentMatcher.group(2);
- String escaped = escapeFieldName(fieldName.substring(1,
fieldName.length() - 1));
+ String escaped = unescapeFieldName(fieldName.substring(1,
fieldName.length() - 1));
Review Comment:
s/escaped/unescaped ?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
Review Comment:
> So I guess with partial progress response -> query times out -> no
document to restart from, we'd have a problem?
Yes, looks like it. Perhaps add an abstract reducer-like method to
`StreamingFirestoreV1ReadFn`? Something with a signature like `OutT
resumptionValue(OutT previousValue, OutT newValue)`?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/QueryUtils.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldFilter;
+import com.google.firestore.v1.StructuredQuery.FieldFilter.Operator;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Filter;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.StructuredQuery.UnaryFilter;
+import com.google.firestore.v1.Value;
+import com.google.firestore.v1.Value.ValueTypeCase;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ascii;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Contains several internal utility functions for Firestore query handling,
such as filling
+ * implicit ordering or escaping field references.
+ */
+class QueryUtils {
+
+ private static final ImmutableSet<Operator> INEQUALITY_FIELD_FILTER_OPS =
+ ImmutableSet.of(
+ FieldFilter.Operator.LESS_THAN,
+ FieldFilter.Operator.LESS_THAN_OR_EQUAL,
+ FieldFilter.Operator.GREATER_THAN,
+ FieldFilter.Operator.GREATER_THAN_OR_EQUAL,
+ FieldFilter.Operator.NOT_EQUAL,
+ FieldFilter.Operator.NOT_IN);
+ private static final ImmutableSet<UnaryFilter.Operator>
INEQUALITY_UNARY_FILTER_OPS =
+ ImmutableSet.of(UnaryFilter.Operator.IS_NOT_NAN,
UnaryFilter.Operator.IS_NOT_NULL);
+
+ private static final String UNQUOTED_NAME_REGEX_STRING =
"([a-zA-Z_][a-zA-Z_0-9]*)";
+ private static final String QUOTED_NAME_REGEX_STRING =
"(`(?:[^`\\\\]|(?:\\\\.))+`)";
+ // After each segment follows a dot and more characters, or the end of the
string.
+ private static final Pattern FIELD_PATH_SEGMENT_REGEX =
+ Pattern.compile(
+ String.format("(?:%s|%s)(\\..+|$)", UNQUOTED_NAME_REGEX_STRING,
QUOTED_NAME_REGEX_STRING),
+ Pattern.DOTALL);
+
+ static List<Order> getImplicitOrderBy(StructuredQuery query) {
+ List<String> expectedImplicitOrders = new ArrayList<>();
+ if (query.hasWhere()) {
+ fillInequalityFields(query.getWhere(), expectedImplicitOrders);
+ }
+ if (!expectedImplicitOrders.contains("__name__")) {
+ expectedImplicitOrders.add("__name__");
+ }
+ for (Order order : query.getOrderByList()) {
+ String orderField = order.getField().getFieldPath();
+ expectedImplicitOrders.remove(orderField);
+ }
+
+ List<Order> additionalOrders = new ArrayList<>();
+ if (!expectedImplicitOrders.isEmpty()) {
+ Direction lastDirection =
+ query.getOrderByCount() == 0
+ ? Direction.ASCENDING
+ : query.getOrderByList().get(query.getOrderByCount() -
1).getDirection();
+
+ for (String field : expectedImplicitOrders) {
+ additionalOrders.add(
+ Order.newBuilder()
+ .setDirection(lastDirection)
+
.setField(FieldReference.newBuilder().setFieldPath(field).build())
+ .build());
+ }
+ }
+
+ return additionalOrders;
+ }
+
+ private static void fillInequalityFields(Filter filter, List<String> result)
{
+ switch (filter.getFilterTypeCase()) {
+ case FIELD_FILTER:
+ if
(INEQUALITY_FIELD_FILTER_OPS.contains(filter.getFieldFilter().getOp())) {
+ String fieldPath = filter.getFieldFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ case COMPOSITE_FILTER:
+ filter.getCompositeFilter().getFiltersList().forEach(f ->
fillInequalityFields(f, result));
+ break;
+ case UNARY_FILTER:
+ if
(INEQUALITY_UNARY_FILTER_OPS.contains(filter.getUnaryFilter().getOp())) {
+ String fieldPath = filter.getUnaryFilter().getField().getFieldPath();
+ if (!result.contains(fieldPath)) {
+ result.add(fieldPath);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
Review Comment:
> that the filters in a composite filter would already be sorted.
The filter order in a composite filter is arbitrary, and picked by the user.
> If not I'll add sorting to the composite filter case
Composite filters can be nested, the sorting needs to be done for all field
paths that appear at any point in the query filter "tree". I think it's also
reasonable to not sort and only ensure that the results contains only one
*canonical* field paths, otherwise fail with a clear message that **this**
version of the beam connector does not support query with multiple inequalities
(this is punting the issue for later, and hoping we will have a better
mechanism for resumption by then).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]