danthev commented on code in PR #22175:
URL: https://github.com/apache/beam/pull/22175#discussion_r917187222
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java:
##########
@@ -109,44 +107,36 @@ protected ServerStreamingCallable<RunQueryRequest,
RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
- StructuredQuery.Builder builder;
- List<Order> orderByList = query.getOrderByList();
- // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
- // Before we can set the cursor to the last document name read, we need
to explicitly add
- // the order of "__name__ ASC" because a cursor value must map to an
order by
- if (orderByList.isEmpty()) {
- builder =
- query
- .toBuilder()
- .addOrderBy(
- Order.newBuilder()
-
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
- .setDirection(Direction.ASCENDING)
- .build())
- .setStartAt(
- Cursor.newBuilder()
- .setBefore(false)
- .addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build()));
- } else {
- Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
- Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
- for (Order order : orderByList) {
- String fieldPath = order.getField().getFieldPath();
- Value value = fieldsMap.get(fieldPath);
- if (value != null) {
- cursor.addValues(value);
- } else if ("__name__".equals(fieldPath)) {
- cursor.addValues(
- Value.newBuilder()
-
.setReferenceValue(runQueryResponse.getDocument().getName())
- .build());
+ StructuredQuery.Builder builder = query.toBuilder();
+ builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+
+ Map<String, Value> valueMap =
runQueryResponse.getDocument().getFieldsMap();
Review Comment:
Hmm, that reporting of partial progress sounds like a potential failure
case. Could you have another look at the implementation there around line 415?
I think we might have to completely override `processElement` for RunQueryFn
and only set lastReceivedValue if the response contained a document.
Beam only checks if the response is null before calling `setStartFrom`,
though it also only calls it if the ServerStream has ended in a
RuntimeException. So I guess with partial progress response -> query times out
-> no document to restart from, we'd have a problem?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]