danthev commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r655638240



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream 
will attempt to resume
+   * rather than starting over. The restarting of the stream will continue 
within the scope of the
+   * completion of the request (meaning any possibility of resumption is 
contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> 
getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
+      // Before we can set the cursor to the last document name read, we need 
to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an 
order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                
.setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    
.setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be 
aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is 
necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. 
See <a target="_blank"
+   * rel="noopener noreferrer"
+   * 
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body";>{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> 
{
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = 
rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable 
PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return 
request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each 
page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> 
getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each 
page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, 
ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, 
BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, 
BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse 
mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? 
mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final 
element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {
+        String docName = documentsList.get(i);
+        if (docName.equals(missing) || docName.equals(foundName)) {
+          startIndex = i;
+          break;
+        }
+      }
+      if (0 <= startIndex) {
+        BatchGetDocumentsRequest.Builder builder = 
originalRequest.toBuilder().clearDocuments();
+        documentsList.stream()
+            .skip(startIndex + 1) // start from the next entry from the one we 
found
+            .forEach(builder::addDocuments);
+        return builder.build();
+      }
+      // unable to find a match, return the original request
+      return originalRequest;

Review comment:
       Yes, that sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to