BenWhitehead commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r666366199



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.compatqual.NullableDecl;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream 
will attempt to resume
+   * rather than starting over. The restarting of the stream will continue 
within the scope of the
+   * completion of the request (meaning any possibility of resumption is 
contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> 
getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
+      // Before we can set the cursor to the last document name read, we need 
to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an 
order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                
.setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    
.setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be 
aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is 
necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. 
See <a target="_blank"
+   * rel="noopener noreferrer"
+   * 
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body";>{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> 
{
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = 
rpcQos.newReadAttempt(getRpcAttemptContext());

Review comment:
       No this is not a global lock, it is something that is instantiated per 
request attempt on each worker. Each attempt encapsulates the logic for number 
of RPC attempts, backoff, retry ability as well as success/failure and rpc 
duration metrics. `RpcQos` however is managed from `@Setup` instead of 
`@StartBundle` so that the rpc metrics can be tracked cross bundle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to