danthev commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r655626971
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -189,6 +533,725 @@ private Write() {}
}
}
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * ListCollectionIdsRequest}{@code >, }{@link PTransform}{@code <}{@link
+ * ListCollectionIdsResponse}{@code >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listCollectionIds()
+ * @see FirestoreV1.ListCollectionIds.Builder
+ * @see ListCollectionIdsRequest
+ * @see ListCollectionIdsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+ */
+ public static final class ListCollectionIds
+ extends Transform<
+ PCollection<ListCollectionIdsRequest>,
+ PCollection<String>,
+ ListCollectionIds,
+ ListCollectionIds.Builder> {
+
+ private ListCollectionIds(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<ListCollectionIdsRequest>
input) {
+ return input
+ .apply(
+ "listCollectionIds",
+ ParDo.of(
+ new ListCollectionIdsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(ParDo.of(new FlattenListCollectionIdsResponse()))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link ListCollectionIds} allowing
configuration and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listCollectionIds()
+ * @see FirestoreV1.ListCollectionIds
+ * @see ListCollectionIdsRequest
+ * @see ListCollectionIdsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<ListCollectionIdsRequest>,
+ PCollection<String>,
+ ListCollectionIds,
+ ListCollectionIds.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public ListCollectionIds build() {
+ return genericBuild();
+ }
+
+ @Override
+ ListCollectionIds buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new ListCollectionIds(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * ListDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
ListDocumentsResponse}{@code
+ * >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#listDocuments() listDocuments()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listDocuments()
+ * @see FirestoreV1.ListDocuments.Builder
+ * @see ListDocumentsRequest
+ * @see ListDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+ */
+ public static final class ListDocuments
+ extends Transform<
+ PCollection<ListDocumentsRequest>,
+ PCollection<Document>,
+ ListDocuments,
+ ListDocuments.Builder> {
+
+ private ListDocuments(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<Document> expand(PCollection<ListDocumentsRequest>
input) {
+ return input
+ .apply(
+ "listDocuments",
+ ParDo.of(
+ new ListDocumentsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(ParDo.of(new ListDocumentsResponseToDocument()))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link ListDocuments} allowing configuration
and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#listDocuments() listDocuments()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listDocuments()
+ * @see FirestoreV1.ListDocuments
+ * @see ListDocumentsRequest
+ * @see ListDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<ListDocumentsRequest>,
+ PCollection<Document>,
+ ListDocuments,
+ ListDocuments.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public ListDocuments build() {
+ return genericBuild();
+ }
+
+ @Override
+ ListDocuments buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new ListDocuments(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * RunQueryRequest}{@code >, }{@link PTransform}{@code <}{@link
RunQueryResponse}{@code >>} which
+ * will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#runQuery() runQuery()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
RunQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#runQuery()
+ * @see FirestoreV1.RunQuery.Builder
+ * @see RunQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+ */
+ public static final class RunQuery
+ extends Transform<
+ PCollection<RunQueryRequest>, PCollection<RunQueryResponse>,
RunQuery, RunQuery.Builder> {
+
+ private RunQuery(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<RunQueryResponse> expand(PCollection<RunQueryRequest>
input) {
+ return input
+ .apply(
+ "runQuery",
+ ParDo.of(new RunQueryFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link RunQuery} allowing configuration and
instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#runQuery() runQuery()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#runQuery()
+ * @see FirestoreV1.RunQuery
+ * @see RunQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<RunQueryRequest>,
+ PCollection<RunQueryResponse>,
+ RunQuery,
+ RunQuery.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public RunQuery build() {
+ return genericBuild();
+ }
+
+ @Override
+ RunQuery buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new RunQuery(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * BatchGetDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
+ * BatchGetDocumentsResponse}{@code >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#batchGetDocuments()
+ * @see FirestoreV1.BatchGetDocuments.Builder
+ * @see BatchGetDocumentsRequest
+ * @see BatchGetDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+ */
+ public static final class BatchGetDocuments
+ extends Transform<
+ PCollection<BatchGetDocumentsRequest>,
+ PCollection<BatchGetDocumentsResponse>,
+ BatchGetDocuments,
+ BatchGetDocuments.Builder> {
+
+ private BatchGetDocuments(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<BatchGetDocumentsResponse> expand(
+ PCollection<BatchGetDocumentsRequest> input) {
+ return input
+ .apply(
+ "batchGetDocuments",
+ ParDo.of(
+ new BatchGetDocumentsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link BatchGetDocuments} allowing
configuration and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#batchGetDocuments()
+ * @see FirestoreV1.BatchGetDocuments
+ * @see BatchGetDocumentsRequest
+ * @see BatchGetDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<BatchGetDocumentsRequest>,
+ PCollection<BatchGetDocumentsResponse>,
+ BatchGetDocuments,
+ BatchGetDocuments.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ public Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public BatchGetDocuments build() {
+ return genericBuild();
+ }
+
+ @Override
+ BatchGetDocuments buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new BatchGetDocuments(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * PartitionQueryRequest}{@code >, }{@link PTransform}{@code <}{@link
RunQueryResponse}{@code >>}
+ * which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#partitionQuery()
+ * @see FirestoreV1.PartitionQuery.Builder
+ * @see PartitionQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+ */
+ public static final class PartitionQuery
+ extends Transform<
+ PCollection<PartitionQueryRequest>,
+ PCollection<RunQueryResponse>,
+ PartitionQuery,
+ PartitionQuery.Builder> {
+
+ private final boolean nameOnlyQuery;
+
+ private PartitionQuery(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions,
+ boolean nameOnlyQuery) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ this.nameOnlyQuery = nameOnlyQuery;
+ }
+
+ @Override
+ public PCollection<RunQueryResponse>
expand(PCollection<PartitionQueryRequest> input) {
+ PCollection<RunQueryRequest> queries =
+ input
+ .apply(
+ "PartitionQuery",
+ ParDo.of(
+ new PartitionQueryFn(
+ clock, firestoreStatefulComponentFactory,
rpcQosOptions)))
+ .apply("expand queries", ParDo.of(new
PartitionQueryResponseToRunQueryRequest()));
+ if (nameOnlyQuery) {
+ queries =
+ queries.apply(
+ "set name only query",
+ MapElements.via(
+ new SimpleFunction<RunQueryRequest, RunQueryRequest>() {
+ @Override
+ public RunQueryRequest apply(RunQueryRequest input) {
+ RunQueryRequest.Builder builder = input.toBuilder();
+ builder
+ .getStructuredQueryBuilder()
+ .setSelect(
+ Projection.newBuilder()
+ .addFields(
+ FieldReference.newBuilder()
+ .setFieldPath("__name__")
+ .build())
+ .build());
+ return builder.build();
+ }
+ }));
+ }
+ return queries
+ .apply(Reshuffle.viaRandomKey())
+ .apply(new RunQuery(clock, firestoreStatefulComponentFactory,
rpcQosOptions));
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions, nameOnlyQuery);
+ }
+
+ /**
+ * A type safe builder for {@link PartitionQuery} allowing configuration
and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#partitionQuery()
+ * @see FirestoreV1.PartitionQuery
+ * @see PartitionQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<PartitionQueryRequest>,
+ PCollection<RunQueryResponse>,
+ PartitionQuery,
+ FirestoreV1.PartitionQuery.Builder> {
+
+ private boolean nameOnlyQuery = false;
+
+ private Builder() {
+ super();
+ }
+
+ public Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions,
+ boolean nameOnlyQuery) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ this.nameOnlyQuery = nameOnlyQuery;
+ }
+
+ @Override
+ public PartitionQuery build() {
+ return genericBuild();
+ }
+
+ /**
+ * Update produced queries to only retrieve their {@code __name__}
thereby not retrieving any
+ * fields and reducing resource requirements.
+ *
+ * @return this builder
+ */
+ public Builder withNameOnlyQuery() {
+ this.nameOnlyQuery = true;
+ return this;
+ }
+
+ @Override
+ PartitionQuery buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new PartitionQuery(
+ clock, firestoreStatefulComponentFactory, rpcQosOptions,
nameOnlyQuery);
+ }
+ }
+
+ /**
+ * DoFn which contains the logic necessary to turn a {@link
PartitionQueryRequest} and {@link
+ * PartitionQueryResponse} pair into {@code N} {@link RunQueryRequest}.
+ */
+ static final class PartitionQueryResponseToRunQueryRequest
+ extends DoFn<PartitionQueryPair, RunQueryRequest> {
+
+ /**
+ * When fetching cursors that span multiple pages it is expected (per <a
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">
+ * PartitionQueryRequest.page_token</a>) for the client to sort the
cursors before processing
+ * them to define the sub-queries. So here we're defining a Comparator
which will sort Cursors
+ * by the first reference value present, then comparing the reference
values
+ * lexicographically.
+ */
+ static final Comparator<Cursor> CURSOR_REFERENCE_VALUE_COMPARATOR;
+
+ static {
+ Function<Cursor, Optional<Value>> firstReferenceValue =
+ (Cursor c) ->
+ c.getValuesList().stream()
+ .filter(
+ v -> {
+ String referenceValue = v.getReferenceValue();
+ return referenceValue != null &&
!referenceValue.isEmpty();
+ })
+ .findFirst();
+ Function<String, String[]> stringToPath = (String s) -> s.split("/");
+ // compare references by their path segments rather than as a whole
string to ensure
+ // per path segment comparison is taken into account.
+ Comparator<String[]> pathWiseCompare =
+ (String[] path1, String[] path2) -> {
+ int minLength = Math.min(path1.length, path2.length);
+ for (int i = 0; i < minLength; i++) {
+ String pathSegment1 = path1[i];
+ String pathSegment2 = path2[i];
+ int compare = pathSegment1.compareTo(pathSegment2);
+ if (compare != 0) {
+ return compare;
+ }
+ }
+ if (path1.length == path2.length) {
+ return 0;
+ } else if (minLength == path1.length) {
+ return -1;
+ } else {
+ return 1;
+ }
+ };
+
+ // Sort those cursors which have no firstReferenceValue at the bottom
of the list
+ CURSOR_REFERENCE_VALUE_COMPARATOR =
+ Comparator.comparing(
+ firstReferenceValue,
+ (o1, o2) -> {
+ if (o1.isPresent() && o2.isPresent()) {
+ return pathWiseCompare.compare(
+ stringToPath.apply(o1.get().getReferenceValue()),
+ stringToPath.apply(o2.get().getReferenceValue()));
+ } else if (o1.isPresent()) {
+ return -1;
+ } else {
+ return 1;
+ }
+ });
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ PartitionQueryPair pair = c.element();
Review comment:
It seems this logic isn't tested, just the comparator itself. Can you
add one that calls `runFunction`?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>If an error is encountered while reading from the stream, the stream
will attempt to resume
+ * rather than starting over. The restarting of the stream will continue
within the scope of the
+ * completion of the request (meaning any possibility of resumption is
contingent upon an attempt
+ * being available in the Qos budget).
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class RunQueryFn
+ extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+ RunQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+ }
+
+ @Override
+ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.runQueryCallable();
+ }
+
+ @Override
+ protected RunQueryRequest setStartFrom(
+ RunQueryRequest element, RunQueryResponse runQueryResponse) {
+ StructuredQuery query = element.getStructuredQuery();
+ StructuredQuery.Builder builder;
+ List<Order> orderByList = query.getOrderByList();
+ // if the orderByList is empty that means the default sort of "__name__
ASC" will be used
+ // Before we can set the cursor to the last document name read, we need
to explicitly add
+ // the order of "__name__ ASC" because a cursor value must map to an
order by
+ if (orderByList.isEmpty()) {
+ builder =
+ query
+ .toBuilder()
+ .addOrderBy(
+ Order.newBuilder()
+
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+ .setDirection(Direction.ASCENDING)
+ .build())
+ .setStartAt(
+ Cursor.newBuilder()
+ .setBefore(false)
+ .addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build()));
+ } else {
+ Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+ Map<String, Value> fieldsMap =
runQueryResponse.getDocument().getFieldsMap();
+ for (Order order : orderByList) {
+ String fieldPath = order.getField().getFieldPath();
+ Value value = fieldsMap.get(fieldPath);
+ if (value != null) {
+ cursor.addValues(value);
+ } else if ("__name__".equals(fieldPath)) {
+ cursor.addValues(
+ Value.newBuilder()
+
.setReferenceValue(runQueryResponse.getDocument().getName())
+ .build());
+ }
+ }
+ builder = query.toBuilder().setStartAt(cursor.build());
+ }
+ return element.toBuilder().setStructuredQuery(builder.build()).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, all pages will be
aggregated before being
+ * emitted to the next stage of the pipeline. Aggregation of pages is
necessary as the next step
+ * of pairing of cursors to create N queries must first sort all cursors.
See <a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+ * pageToken}s</a> documentation for details.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class PartitionQueryFn
+ extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair>
{
+
+ public PartitionQueryFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+ }
+
+ @Override
+ public void processElement(ProcessContext context) throws Exception {
+ @SuppressWarnings("nullness")
+ final PartitionQueryRequest element =
+ requireNonNull(context.element(), "c.element() must be non null");
+
+ RpcQos.RpcReadAttempt attempt =
rpcQos.newReadAttempt(getRpcAttemptContext());
+ PartitionQueryResponse.Builder aggregate = null;
+ while (true) {
+ if (!attempt.awaitSafeToProceed(clock.instant())) {
+ continue;
+ }
+
+ try {
+ PartitionQueryRequest request = setPageToken(element, aggregate);
+ attempt.recordRequestStart(clock.instant());
+ PartitionQueryPagedResponse pagedResponse =
+ firestoreStub.partitionQueryPagedCallable().call(request);
+ for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+ attempt.recordRequestSuccessful(clock.instant());
+ PartitionQueryResponse response = page.getResponse();
+ if (aggregate == null) {
+ aggregate = response.toBuilder();
+ } else {
+ aggregate.addAllPartitions(response.getPartitionsList());
+ if (page.hasNextPage()) {
+ aggregate.setNextPageToken(response.getNextPageToken());
+ } else {
+ aggregate.clearNextPageToken();
+ }
+ }
+ if (page.hasNextPage()) {
+ attempt.recordRequestStart(clock.instant());
+ }
+ }
+ attempt.completeSuccess();
+ break;
+ } catch (RuntimeException exception) {
+ Instant end = clock.instant();
+ attempt.recordRequestFailed(end);
+ attempt.checkCanRetry(end, exception);
+ }
+ }
+ if (aggregate != null) {
+ context.output(new PartitionQueryPair(element, aggregate.build()));
+ }
+ }
+
+ private PartitionQueryRequest setPageToken(
+ PartitionQueryRequest request,
+ @edu.umd.cs.findbugs.annotations.Nullable
PartitionQueryResponse.Builder aggregate) {
+ if (aggregate != null && aggregate.getNextPageToken() != null) {
+ return
request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+ }
+ return request;
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListDocumentsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListDocumentsRequest,
+ ListDocumentsPagedResponse,
+ ListDocumentsPage,
+ ListDocumentsResponse> {
+
+ ListDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+ }
+
+ @Override
+ protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse>
getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listDocumentsPagedCallable();
+ }
+
+ @Override
+ protected ListDocumentsRequest setPageToken(
+ ListDocumentsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+ *
+ * <p>This Fn uses pagination to obtain responses, the response from each
page will be output to
+ * the next stage of the pipeline.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class ListCollectionIdsFn
+ extends PaginatedFirestoreV1ReadFn<
+ ListCollectionIdsRequest,
+ ListCollectionIdsPagedResponse,
+ ListCollectionIdsPage,
+ ListCollectionIdsResponse> {
+
+ ListCollectionIdsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+ }
+
+ @Override
+ protected UnaryCallable<ListCollectionIdsRequest,
ListCollectionIdsPagedResponse> getCallable(
+ FirestoreStub firestoreStub) {
+ return firestoreStub.listCollectionIdsPagedCallable();
+ }
+
+ @Override
+ protected ListCollectionIdsRequest setPageToken(
+ ListCollectionIdsRequest request, String nextPageToken) {
+ return request.toBuilder().setPageToken(nextPageToken).build();
+ }
+ }
+
+ /**
+ * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+ *
+ * <p>This Fn uses a stream to obtain responses, each response from the
stream will be output to
+ * the next stage of the pipeline. Each response from the stream represents
an individual document
+ * with the associated metadata.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ * the lifecycle of this Fn.
+ */
+ static final class BatchGetDocumentsFn
+ extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest,
BatchGetDocumentsResponse> {
+
+ BatchGetDocumentsFn(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public Context getRpcAttemptContext() {
+ return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+ }
+
+ @Override
+ protected ServerStreamingCallable<BatchGetDocumentsRequest,
BatchGetDocumentsResponse>
+ getCallable(FirestoreStub firestoreStub) {
+ return firestoreStub.batchGetDocumentsCallable();
+ }
+
+ @Override
+ protected BatchGetDocumentsRequest setStartFrom(
+ BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse
mostRecentResponse) {
+ int startIndex = -1;
+ ProtocolStringList documentsList = originalRequest.getDocumentsList();
+ String missing = mostRecentResponse.getMissing();
+ String foundName =
+ mostRecentResponse.hasFound() ?
mostRecentResponse.getFound().getName() : null;
+ // we only scan until the second to last originalRequest. If the final
element were to be
+ // reached
+ // the full request would be complete and we wouldn't be in this scenario
+ int maxIndex = documentsList.size() - 2;
+ for (int i = 0; i <= maxIndex; i++) {
+ String docName = documentsList.get(i);
+ if (docName.equals(missing) || docName.equals(foundName)) {
+ startIndex = i;
+ break;
+ }
+ }
+ if (0 <= startIndex) {
+ BatchGetDocumentsRequest.Builder builder =
originalRequest.toBuilder().clearDocuments();
+ documentsList.stream()
+ .skip(startIndex + 1) // start from the next entry from the one we
found
+ .forEach(builder::addDocuments);
+ return builder.build();
+ }
+ // unable to find a match, return the original request
+ return originalRequest;
Review comment:
Yes, that sounds good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]