[
https://issues.apache.org/jira/browse/BEAM-8376?focusedWorklogId=611681&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-611681
]
ASF GitHub Bot logged work on BEAM-8376:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Jun/21 00:38
Start Date: 16/Jun/21 00:38
Worklog Time Spent: 10m
Work Description: danthev commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r652221313
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>If an error is encountered while reading from the stream, the stream
will attempt to resume
+ * rather than starting over. The restarting of the stream will continue
within the scope of the
+ * completion of the request (meaning any possibility of resumption is
contingent upon an attempt
+ * being available in the Qos budget).
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class RunQueryFn
+ extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+ RunQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+ }
+
+ @Override
+ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.runQueryCallable();
+ }
+
+ @Override
+ protected RunQueryRequest setStartFrom(
+ RunQueryRequest element, RunQueryResponse runQueryResponse) {
+ StructuredQuery query = element.getStructuredQuery();
+ StructuredQuery.Builder builder;
+ List<Order> orderByList = query.getOrderByList();
+ // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
+ // Before we can set the cursor to the last document name read, we need
to explicitly add
+ // the order of "__name__ ASC" because a cursor value must map to an
order by
+ if (orderByList.isEmpty()) {
+ builder =
+ query
+ .toBuilder()
+ .addOrderBy(
+ Order.newBuilder()
+
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+ .setDirection(Direction.ASCENDING)
+ .build())
+ .setStartAt(
+ Cursor.newBuilder()
+ .setBefore(false)
+ .addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build()));
+ } else {
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+ Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : orderByList) {
+ String fieldPath = order.getField().getFieldPath();
+ Value value = fieldsMap.get(fieldPath);
+ if (value != null) {
+ cursor.addValues(value);
+ } else if ("__name__".equals(fieldPath)) {
+ cursor.addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ }
+ }
+ builder = query.toBuilder().setStartAt(cursor.build());
+ }
+ return element.toBuilder().setStructuredQuery(builder.build()).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, all pages will be
aggregated before being
+ * emitted to the next stage of the pipeline. Aggregation of pages is
necessary as the next step
+ * of pairing of cursors to create N queries must first sort all cursors.
See <a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+ * pageToken}s</a> documentation for details.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class PartitionQueryFn
+ extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair>
{
+
+ public PartitionQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+ }
+
+ @Override
+ public void processElement(ProcessContext context) throws Exception {
+ @SuppressWarnings("nullness")
+ final PartitionQueryRequest element =
+ requireNonNull(context.element(), "c.element() must be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ PartitionQueryResponse.Builder aggregate = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ PartitionQueryRequest request = setPageToken(element, aggregate);
+ attempt.recordRequestStart(clock.instant());
+ PartitionQueryPagedResponse pagedResponse =
+ firestoreStub.partitionQueryPagedCallable().call(request);
+ for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+ attempt.recordRequestSuccessful(clock.instant());
+ PartitionQueryResponse response = page.getResponse();
+ if (aggregate == null) {
+ aggregate = response.toBuilder();
+ } else {
+ aggregate.addAllPartitions(response.getPartitionsList());
+ if (page.hasNextPage()) {
+ aggregate.setNextPageToken(response.getNextPageToken());
+ } else {
+ aggregate.clearNextPageToken();
+ }
+ }
+ if (page.hasNextPage()) {
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ if (aggregate != null) {
+ context.output(new PartitionQueryPair(element, aggregate.build()));
+ }
+ }
+
+ private PartitionQueryRequest setPageToken(
+ PartitionQueryRequest request,
+ @edu.umd.cs.findbugs.annotations.Nullable
PartitionQueryResponse.Builder aggregate) {
+ if (aggregate != null && aggregate.getNextPageToken() != null) {
+ return
request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+ }
+ return request;
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListDocumentsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListDocumentsRequest,
+ ListDocumentsPagedResponse,
+ ListDocumentsPage,
+ ListDocumentsResponse> {
+
+ ListDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+ }
+
+ @Override
+ protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listDocumentsPagedCallable();
+ }
+
+ @Override
+ protected ListDocumentsRequest setPageToken(
+ ListDocumentsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListCollectionIdsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListCollectionIdsRequest,
+ ListCollectionIdsPagedResponse,
+ ListCollectionIdsPage,
+ ListCollectionIdsResponse> {
+
+ ListCollectionIdsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+ }
+
+ @Override
+ protected UnaryCallable<ListCollectionIdsRequest,
ListCollectionIdsPagedResponse> getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listCollectionIdsPagedCallable();
+ }
+
+ @Override
+ protected ListCollectionIdsRequest setPageToken(
+ ListCollectionIdsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class BatchGetDocumentsFn
+ extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest,
BatchGetDocumentsResponse> {
+
+ BatchGetDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+ }
+
+ @Override
+ protected ServerStreamingCallable<BatchGetDocumentsRequest,
BatchGetDocumentsResponse>
+ getCallable(FirestoreStub firestoreStub) {
+ return firestoreStub.batchGetDocumentsCallable();
+ }
+
+ @Override
+ protected BatchGetDocumentsRequest setStartFrom(
+ BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse
mostRecentResponse) {
+ int startIndex = -1;
+ ProtocolStringList documentsList = originalRequest.getDocumentsList();
+ String missing = mostRecentResponse.getMissing();
+ String foundName =
+ mostRecentResponse.hasFound() ?
mostRecentResponse.getFound().getName() : null;
+ // we only scan until the second to last originalRequest. If the final
element were to be
+ // reached
+ // the full request would be complete and we wouldn't be in this scenario
+ int maxIndex = documentsList.size() - 2;
+ for (int i = 0; i <= maxIndex; i++) {
Review comment:
I don't know much about what the response will look like. But given that
you skip everything up to `startIndex`, it seems safe to assume this is always
sequential? If so, can you advance an object-level counter instead of iterating
over all document names every time?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -700,6 +723,142 @@ public long nextBackOffMillis() {
}
}
+ /**
+ * This class implements a backoff algorithm similar to that of {@link
+ * org.apache.beam.sdk.util.FluentBackoff} with a could key differences:
Review comment:
Typo
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -700,6 +723,142 @@ public long nextBackOffMillis() {
}
}
+ /**
+ * This class implements a backoff algorithm similar to that of {@link
+ * org.apache.beam.sdk.util.FluentBackoff} with a could key differences:
+ *
+ * <ol>
+ * <li>A set of status code numbers may be specified to have a graceful
evaluation
+ * <li>Gracefully evaluated status code numbers will increment a decaying
counter to ensure if
+ * the graceful status code numbers occur more than once in the
previous 60 seconds the
+ * regular backoff behavior will kick in.
+ * <li>The random number generator used to induce jitter is provided via
constructor parameter
+ * rather than using {@link Math#random()}}
+ * </ol>
+ *
+ * The primary motivation for creating this implementation is to support
streamed responses from
+ * Firestore. In the case of RunQuery and BatchGet the results are returned
via stream. The result
+ * stream has a maximum lifetime of 60 seconds before it will be broken and
an UNAVAILABLE status
+ * code will be raised. Give this UNAVAILABLE is expected for streams this
class allows for
+ * defining a set of status code numbers which are give a grace count of 1
before backoff kicks
+ * in. When backoff does kick in, it is implemented using the same
calculations as {@link
+ * org.apache.beam.sdk.util.FluentBackoff}.
+ */
+ static final class StatusCodeAwareBackoff {
+ private static final double RANDOMIZATION_FACTOR = 0.5;
Review comment:
This seems like a lot, does `FluentBackoff` have that much fuzziness as
well?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>If an error is encountered while reading from the stream, the stream
will attempt to resume
+ * rather than starting over. The restarting of the stream will continue
within the scope of the
+ * completion of the request (meaning any possibility of resumption is
contingent upon an attempt
+ * being available in the Qos budget).
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class RunQueryFn
+ extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+ RunQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+ }
+
+ @Override
+ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.runQueryCallable();
+ }
+
+ @Override
+ protected RunQueryRequest setStartFrom(
+ RunQueryRequest element, RunQueryResponse runQueryResponse) {
+ StructuredQuery query = element.getStructuredQuery();
+ StructuredQuery.Builder builder;
+ List<Order> orderByList = query.getOrderByList();
+ // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
+ // Before we can set the cursor to the last document name read, we need
to explicitly add
+ // the order of "__name__ ASC" because a cursor value must map to an
order by
+ if (orderByList.isEmpty()) {
+ builder =
+ query
+ .toBuilder()
+ .addOrderBy(
+ Order.newBuilder()
+
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+ .setDirection(Direction.ASCENDING)
+ .build())
+ .setStartAt(
+ Cursor.newBuilder()
+ .setBefore(false)
+ .addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build()));
+ } else {
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+ Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : orderByList) {
+ String fieldPath = order.getField().getFieldPath();
+ Value value = fieldsMap.get(fieldPath);
+ if (value != null) {
+ cursor.addValues(value);
+ } else if ("__name__".equals(fieldPath)) {
+ cursor.addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ }
+ }
+ builder = query.toBuilder().setStartAt(cursor.build());
+ }
+ return element.toBuilder().setStructuredQuery(builder.build()).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, all pages will be
aggregated before being
+ * emitted to the next stage of the pipeline. Aggregation of pages is
necessary as the next step
+ * of pairing of cursors to create N queries must first sort all cursors.
See <a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+ * pageToken}s</a> documentation for details.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class PartitionQueryFn
+ extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair>
{
+
+ public PartitionQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+ }
+
+ @Override
+ public void processElement(ProcessContext context) throws Exception {
+ @SuppressWarnings("nullness")
+ final PartitionQueryRequest element =
+ requireNonNull(context.element(), "c.element() must be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ PartitionQueryResponse.Builder aggregate = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ PartitionQueryRequest request = setPageToken(element, aggregate);
+ attempt.recordRequestStart(clock.instant());
+ PartitionQueryPagedResponse pagedResponse =
+ firestoreStub.partitionQueryPagedCallable().call(request);
+ for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+ attempt.recordRequestSuccessful(clock.instant());
+ PartitionQueryResponse response = page.getResponse();
+ if (aggregate == null) {
+ aggregate = response.toBuilder();
+ } else {
+ aggregate.addAllPartitions(response.getPartitionsList());
+ if (page.hasNextPage()) {
+ aggregate.setNextPageToken(response.getNextPageToken());
+ } else {
+ aggregate.clearNextPageToken();
+ }
+ }
+ if (page.hasNextPage()) {
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ if (aggregate != null) {
+ context.output(new PartitionQueryPair(element, aggregate.build()));
+ }
+ }
+
+ private PartitionQueryRequest setPageToken(
+ PartitionQueryRequest request,
+ @edu.umd.cs.findbugs.annotations.Nullable
PartitionQueryResponse.Builder aggregate) {
+ if (aggregate != null && aggregate.getNextPageToken() != null) {
+ return
request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+ }
+ return request;
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListDocumentsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListDocumentsRequest,
+ ListDocumentsPagedResponse,
+ ListDocumentsPage,
+ ListDocumentsResponse> {
+
+ ListDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+ }
+
+ @Override
+ protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listDocumentsPagedCallable();
+ }
+
+ @Override
+ protected ListDocumentsRequest setPageToken(
+ ListDocumentsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListCollectionIdsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListCollectionIdsRequest,
+ ListCollectionIdsPagedResponse,
+ ListCollectionIdsPage,
+ ListCollectionIdsResponse> {
+
+ ListCollectionIdsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+ }
+
+ @Override
+ protected UnaryCallable<ListCollectionIdsRequest,
ListCollectionIdsPagedResponse> getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listCollectionIdsPagedCallable();
+ }
+
+ @Override
+ protected ListCollectionIdsRequest setPageToken(
+ ListCollectionIdsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class BatchGetDocumentsFn
+ extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest,
BatchGetDocumentsResponse> {
+
+ BatchGetDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+ }
+
+ @Override
+ protected ServerStreamingCallable<BatchGetDocumentsRequest,
BatchGetDocumentsResponse>
+ getCallable(FirestoreStub firestoreStub) {
+ return firestoreStub.batchGetDocumentsCallable();
+ }
+
+ @Override
+ protected BatchGetDocumentsRequest setStartFrom(
+ BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse
mostRecentResponse) {
+ int startIndex = -1;
+ ProtocolStringList documentsList = originalRequest.getDocumentsList();
+ String missing = mostRecentResponse.getMissing();
+ String foundName =
+ mostRecentResponse.hasFound() ?
mostRecentResponse.getFound().getName() : null;
+ // we only scan until the second to last originalRequest. If the final
element were to be
+ // reached
+ // the full request would be complete and we wouldn't be in this scenario
+ int maxIndex = documentsList.size() - 2;
+ for (int i = 0; i <= maxIndex; i++) {
+ String docName = documentsList.get(i);
+ if (docName.equals(missing) || docName.equals(foundName)) {
+ startIndex = i;
+ break;
+ }
+ }
+ if (0 <= startIndex) {
+ BatchGetDocumentsRequest.Builder builder =
originalRequest.toBuilder().clearDocuments();
+ documentsList.stream()
+ .skip(startIndex + 1) // start from the next entry from the one we
found
+ .forEach(builder::addDocuments);
+ return builder.build();
+ }
+ // unable to find a match, return the original request
+ return originalRequest;
Review comment:
Will this only ever happen on the first response with the transaction
field set? Maybe you can add a comment here to make this clearer. If there's
any other way this could fail I prefer an exception.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>If an error is encountered while reading from the stream, the stream
will attempt to resume
+ * rather than starting over. The restarting of the stream will continue
within the scope of the
+ * completion of the request (meaning any possibility of resumption is
contingent upon an attempt
+ * being available in the Qos budget).
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class RunQueryFn
+ extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+ RunQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+ }
+
+ @Override
+ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.runQueryCallable();
+ }
+
+ @Override
+ protected RunQueryRequest setStartFrom(
+ RunQueryRequest element, RunQueryResponse runQueryResponse) {
+ StructuredQuery query = element.getStructuredQuery();
+ StructuredQuery.Builder builder;
+ List<Order> orderByList = query.getOrderByList();
+ // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
+ // Before we can set the cursor to the last document name read, we need
to explicitly add
+ // the order of "__name__ ASC" because a cursor value must map to an
order by
+ if (orderByList.isEmpty()) {
+ builder =
+ query
+ .toBuilder()
+ .addOrderBy(
+ Order.newBuilder()
+
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+ .setDirection(Direction.ASCENDING)
+ .build())
+ .setStartAt(
+ Cursor.newBuilder()
+ .setBefore(false)
+ .addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build()));
+ } else {
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+ Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : orderByList) {
+ String fieldPath = order.getField().getFieldPath();
+ Value value = fieldsMap.get(fieldPath);
+ if (value != null) {
+ cursor.addValues(value);
+ } else if ("__name__".equals(fieldPath)) {
+ cursor.addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ }
+ }
+ builder = query.toBuilder().setStartAt(cursor.build());
+ }
+ return element.toBuilder().setStructuredQuery(builder.build()).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, all pages will be
aggregated before being
+ * emitted to the next stage of the pipeline. Aggregation of pages is
necessary as the next step
+ * of pairing of cursors to create N queries must first sort all cursors.
See <a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+ * pageToken}s</a> documentation for details.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class PartitionQueryFn
+ extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair>
{
+
+ public PartitionQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+ }
+
+ @Override
+ public void processElement(ProcessContext context) throws Exception {
+ @SuppressWarnings("nullness")
+ final PartitionQueryRequest element =
+ requireNonNull(context.element(), "c.element() must be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ PartitionQueryResponse.Builder aggregate = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ PartitionQueryRequest request = setPageToken(element, aggregate);
+ attempt.recordRequestStart(clock.instant());
+ PartitionQueryPagedResponse pagedResponse =
+ firestoreStub.partitionQueryPagedCallable().call(request);
+ for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+ attempt.recordRequestSuccessful(clock.instant());
+ PartitionQueryResponse response = page.getResponse();
+ if (aggregate == null) {
+ aggregate = response.toBuilder();
+ } else {
+ aggregate.addAllPartitions(response.getPartitionsList());
+ if (page.hasNextPage()) {
+ aggregate.setNextPageToken(response.getNextPageToken());
+ } else {
+ aggregate.clearNextPageToken();
+ }
+ }
+ if (page.hasNextPage()) {
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ if (aggregate != null) {
+ context.output(new PartitionQueryPair(element, aggregate.build()));
+ }
+ }
+
+ private PartitionQueryRequest setPageToken(
+ PartitionQueryRequest request,
+ @edu.umd.cs.findbugs.annotations.Nullable
PartitionQueryResponse.Builder aggregate) {
Review comment:
Wrong import
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>If an error is encountered while reading from the stream, the stream
will attempt to resume
+ * rather than starting over. The restarting of the stream will continue
within the scope of the
+ * completion of the request (meaning any possibility of resumption is
contingent upon an attempt
+ * being available in the Qos budget).
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class RunQueryFn
+ extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+ RunQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+ }
+
+ @Override
+ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.runQueryCallable();
+ }
+
+ @Override
+ protected RunQueryRequest setStartFrom(
+ RunQueryRequest element, RunQueryResponse runQueryResponse) {
+ StructuredQuery query = element.getStructuredQuery();
+ StructuredQuery.Builder builder;
+ List<Order> orderByList = query.getOrderByList();
+ // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
+ // Before we can set the cursor to the last document name read, we need
to explicitly add
+ // the order of "__name__ ASC" because a cursor value must map to an
order by
+ if (orderByList.isEmpty()) {
+ builder =
+ query
+ .toBuilder()
+ .addOrderBy(
+ Order.newBuilder()
+
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+ .setDirection(Direction.ASCENDING)
+ .build())
+ .setStartAt(
+ Cursor.newBuilder()
+ .setBefore(false)
+ .addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build()));
+ } else {
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+ Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : orderByList) {
+ String fieldPath = order.getField().getFieldPath();
+ Value value = fieldsMap.get(fieldPath);
+ if (value != null) {
+ cursor.addValues(value);
+ } else if ("__name__".equals(fieldPath)) {
+ cursor.addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ }
+ }
+ builder = query.toBuilder().setStartAt(cursor.build());
+ }
+ return element.toBuilder().setStructuredQuery(builder.build()).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, all pages will be
aggregated before being
+ * emitted to the next stage of the pipeline. Aggregation of pages is
necessary as the next step
+ * of pairing of cursors to create N queries must first sort all cursors.
See <a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+ * pageToken}s</a> documentation for details.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class PartitionQueryFn
+ extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair>
{
+
+ public PartitionQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+ }
+
+ @Override
+ public void processElement(ProcessContext context) throws Exception {
+ @SuppressWarnings("nullness")
+ final PartitionQueryRequest element =
+ requireNonNull(context.element(), "c.element() must be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ PartitionQueryResponse.Builder aggregate = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ PartitionQueryRequest request = setPageToken(element, aggregate);
+ attempt.recordRequestStart(clock.instant());
+ PartitionQueryPagedResponse pagedResponse =
+ firestoreStub.partitionQueryPagedCallable().call(request);
+ for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+ attempt.recordRequestSuccessful(clock.instant());
+ PartitionQueryResponse response = page.getResponse();
+ if (aggregate == null) {
+ aggregate = response.toBuilder();
+ } else {
+ aggregate.addAllPartitions(response.getPartitionsList());
+ if (page.hasNextPage()) {
+ aggregate.setNextPageToken(response.getNextPageToken());
+ } else {
+ aggregate.clearNextPageToken();
+ }
+ }
+ if (page.hasNextPage()) {
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ if (aggregate != null) {
+ context.output(new PartitionQueryPair(element, aggregate.build()));
+ }
+ }
+
+ private PartitionQueryRequest setPageToken(
+ PartitionQueryRequest request,
+ @edu.umd.cs.findbugs.annotations.Nullable
PartitionQueryResponse.Builder aggregate) {
+ if (aggregate != null && aggregate.getNextPageToken() != null) {
+ return
request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+ }
+ return request;
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListDocumentsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListDocumentsRequest,
+ ListDocumentsPagedResponse,
+ ListDocumentsPage,
+ ListDocumentsResponse> {
+
+ ListDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+ }
+
+ @Override
+ protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listDocumentsPagedCallable();
+ }
+
+ @Override
+ protected ListDocumentsRequest setPageToken(
+ ListDocumentsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListCollectionIdsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListCollectionIdsRequest,
+ ListCollectionIdsPagedResponse,
+ ListCollectionIdsPage,
+ ListCollectionIdsResponse> {
+
+ ListCollectionIdsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+ }
+
+ @Override
+ protected UnaryCallable<ListCollectionIdsRequest,
ListCollectionIdsPagedResponse> getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listCollectionIdsPagedCallable();
+ }
+
+ @Override
+ protected ListCollectionIdsRequest setPageToken(
+ ListCollectionIdsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class BatchGetDocumentsFn
+ extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest,
BatchGetDocumentsResponse> {
+
+ BatchGetDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+ }
+
+ @Override
+ protected ServerStreamingCallable<BatchGetDocumentsRequest,
BatchGetDocumentsResponse>
+ getCallable(FirestoreStub firestoreStub) {
+ return firestoreStub.batchGetDocumentsCallable();
+ }
+
+ @Override
+ protected BatchGetDocumentsRequest setStartFrom(
+ BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse
mostRecentResponse) {
+ int startIndex = -1;
+ ProtocolStringList documentsList = originalRequest.getDocumentsList();
+ String missing = mostRecentResponse.getMissing();
+ String foundName =
+ mostRecentResponse.hasFound() ?
mostRecentResponse.getFound().getName() : null;
+ // we only scan until the second to last originalRequest. If the final
element were to be
+ // reached
+ // the full request would be complete and we wouldn't be in this scenario
+ int maxIndex = documentsList.size() - 2;
+ for (int i = 0; i <= maxIndex; i++) {
+ String docName = documentsList.get(i);
+ if (docName.equals(missing) || docName.equals(foundName)) {
+ startIndex = i;
+ break;
+ }
+ }
+ if (0 <= startIndex) {
+ BatchGetDocumentsRequest.Builder builder =
originalRequest.toBuilder().clearDocuments();
+ documentsList.stream()
+ .skip(startIndex + 1) // start from the next entry from the one we
found
+ .forEach(builder::addDocuments);
+ return builder.build();
+ }
+ // unable to find a match, return the original request
+ return originalRequest;
+ }
+ }
+
+ /**
+ * {@link DoFn} Providing support for a Read type RPC operation which uses a
Stream rather than
+ * pagination. Each response from the stream will be output to the next
stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ *
+ * @param <InT> Request type
+ * @param <OutT> Response type
+ */
+ private abstract static class StreamingFirestoreV1ReadFn<
+ InT extends Message, OutT extends Message>
+ extends BaseFirestoreV1ReadFn<InT, OutT> {
+
+ protected StreamingFirestoreV1ReadFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ protected abstract ServerStreamingCallable<InT, OutT>
getCallable(FirestoreStub firestoreStub);
+
+ protected abstract InT setStartFrom(InT element, OutT out);
+
+ @Override
+ public final void processElement(ProcessContext c) throws Exception {
+ @SuppressWarnings(
+ "nullness") // for some reason requireNonNull thinks its parameter
but be non-null...
+ final InT element = requireNonNull(c.element(), "c.element() must be non
null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ OutT lastReceivedValue = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ Instant start = clock.instant();
+ try {
+ InT request =
+ lastReceivedValue == null ? element : setStartFrom(element,
lastReceivedValue);
+ attempt.recordRequestStart(start);
+ ServerStream<OutT> serverStream =
getCallable(firestoreStub).call(request);
+ attempt.recordRequestSuccessful(clock.instant());
+ for (OutT out : serverStream) {
+ lastReceivedValue = out;
+ attempt.recordStreamValue(clock.instant());
+ c.output(out);
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ }
+ }
+
+ /**
+ * {@link DoFn} Providing support for a Read type RPC operation which uses
pagination rather than
+ * a Stream.
+ *
+ * @param <RequestT> Request type
+ * @param <ResponseT> Response type
+ */
+ @SuppressWarnings({
+ // errorchecker doesn't like the second ? on PagedResponse, seemingly
because of different
+ // recursion depth limits; 3 on the found vs 4 on the required.
+ // The second ? is the type of collection the paged response uses to hold
all responses if
+ // trying to expand all pages to a single collection. We are emitting a
single page at at time
+ // while tracking read progress so we can resume if an error has occurred
and we still have
+ // attempt budget available.
+ "type.argument.type.incompatible"
+ })
+ private abstract static class PaginatedFirestoreV1ReadFn<
+ RequestT extends Message,
+ PagedResponseT extends AbstractPagedListResponse<RequestT,
ResponseT, ?, PageT, ?>,
+ PageT extends AbstractPage<RequestT, ResponseT, ?, PageT>,
+ ResponseT extends Message>
+ extends BaseFirestoreV1ReadFn<RequestT, ResponseT> {
+
+ protected PaginatedFirestoreV1ReadFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ protected abstract UnaryCallable<RequestT, PagedResponseT> getCallable(
+ FirestoreStub firestoreStub);
+
+ protected abstract RequestT setPageToken(RequestT request, String
nextPageToken);
+
+ @Override
+ public final void processElement(ProcessContext c) throws Exception {
+ @SuppressWarnings(
+ "nullness") // for some reason requireNonNull thinks its parameter
but be non-null...
+ final RequestT element = requireNonNull(c.element(), "c.element() must
be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ String nextPageToken = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ RequestT request = nextPageToken == null ? element :
setPageToken(element, nextPageToken);
+ attempt.recordRequestStart(clock.instant());
+ PagedResponseT pagedResponse =
getCallable(firestoreStub).call(request);
+ for (PageT page : pagedResponse.iteratePages()) {
+ ResponseT response = page.getResponse();
+ attempt.recordRequestSuccessful(clock.instant());
+ c.output(response);
+ if (page.hasNextPage()) {
+ nextPageToken = page.getNextPageToken();
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ }
+ }
+
+ /**
+ * Base class for all {@link org.apache.beam.sdk.transforms.DoFn DoFn}s
which provide access to
+ * RPCs from the Cloud Firestore V1 API.
+ *
+ * <p>This class takes care of common lifecycle elements and transient state
management for
+ * subclasses allowing subclasses to provide the minimal implementation for
{@link
+ * NonWindowAwareDoFn#processElement(DoFn.ProcessContext)}}
+ *
+ * @param <InT> The type of element coming into this {@link DoFn}
+ * @param <OutT> The type of element output from this {@link DoFn}
+ */
+ abstract static class BaseFirestoreV1ReadFn<InT, OutT> extends
NonWindowAwareDoFn<InT, OutT>
+ implements HasRpcAttemptContext {
+
+ protected final JodaClock clock;
+ protected final FirestoreStatefulComponentFactory
firestoreStatefulComponentFactory;
+ protected final RpcQosOptions rpcQosOptions;
+
+ // transient running state information, not important to any possible
checkpointing
+ protected transient FirestoreStub firestoreStub;
+ protected transient RpcQos rpcQos;
+ protected transient String projectId;
+
+ @SuppressWarnings(
+ "initialization.fields.uninitialized") // allow transient fields to be
managed by component
+ // lifecycle
+ protected BaseFirestoreV1ReadFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ this.clock = requireNonNull(clock, "clock must be non null");
+ this.firestoreStatefulComponentFactory =
+ requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory
must be non null");
+ this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must
be non null");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setup() {
+ rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final void startBundle(StartBundleContext c) {
+ String project =
c.getPipelineOptions().as(GcpOptions.class).getProject();
+ projectId =
+ requireNonNull(project, "project must be defined on GcpOptions of
PipelineOptions");
+ firestoreStub =
firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("nullness") // allow clearing transient fields
+ @Override
+ public void finishBundle() throws Exception {
+ projectId = null;
+ firestoreStub.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public final void populateDisplayData(
+ @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {
Review comment:
Wrong import
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 611681)
Time Spent: 34h 50m (was: 34h 40m)
> Add FirestoreIO connector to Java SDK
> -------------------------------------
>
> Key: BEAM-8376
> URL: https://issues.apache.org/jira/browse/BEAM-8376
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Stefan Djelekar
> Priority: P3
> Time Spent: 34h 50m
> Remaining Estimate: 0h
>
> Motivation:
> There is no Firestore connector for Java SDK at the moment.
> Having it will enhance the integrations with database options on the Google
> Cloud Platform.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)