BenWhitehead commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r655741368
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -189,6 +533,725 @@ private Write() {}
}
}
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * ListCollectionIdsRequest}{@code >, }{@link PTransform}{@code <}{@link
+ * ListCollectionIdsResponse}{@code >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listCollectionIds()
+ * @see FirestoreV1.ListCollectionIds.Builder
+ * @see ListCollectionIdsRequest
+ * @see ListCollectionIdsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+ */
+ public static final class ListCollectionIds
+ extends Transform<
+ PCollection<ListCollectionIdsRequest>,
+ PCollection<String>,
+ ListCollectionIds,
+ ListCollectionIds.Builder> {
+
+ private ListCollectionIds(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<ListCollectionIdsRequest>
input) {
+ return input
+ .apply(
+ "listCollectionIds",
+ ParDo.of(
+ new ListCollectionIdsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(ParDo.of(new FlattenListCollectionIdsResponse()))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link ListCollectionIds} allowing
configuration and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listCollectionIds()
+ * @see FirestoreV1.ListCollectionIds
+ * @see ListCollectionIdsRequest
+ * @see ListCollectionIdsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds">google.firestore.v1.Firestore.ListCollectionIds</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest">google.firestore.v1.ListCollectionIdsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse">google.firestore.v1.ListCollectionIdsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<ListCollectionIdsRequest>,
+ PCollection<String>,
+ ListCollectionIds,
+ ListCollectionIds.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public ListCollectionIds build() {
+ return genericBuild();
+ }
+
+ @Override
+ ListCollectionIds buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new ListCollectionIds(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * ListDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
ListDocumentsResponse}{@code
+ * >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#listDocuments() listDocuments()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listDocuments()
+ * @see FirestoreV1.ListDocuments.Builder
+ * @see ListDocumentsRequest
+ * @see ListDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+ */
+ public static final class ListDocuments
+ extends Transform<
+ PCollection<ListDocumentsRequest>,
+ PCollection<Document>,
+ ListDocuments,
+ ListDocuments.Builder> {
+
+ private ListDocuments(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<Document> expand(PCollection<ListDocumentsRequest>
input) {
+ return input
+ .apply(
+ "listDocuments",
+ ParDo.of(
+ new ListDocumentsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(ParDo.of(new ListDocumentsResponseToDocument()))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link ListDocuments} allowing configuration
and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#listDocuments() listDocuments()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#listDocuments()
+ * @see FirestoreV1.ListDocuments
+ * @see ListDocumentsRequest
+ * @see ListDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments">google.firestore.v1.Firestore.ListDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest">google.firestore.v1.ListDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse">google.firestore.v1.ListDocumentsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<ListDocumentsRequest>,
+ PCollection<Document>,
+ ListDocuments,
+ ListDocuments.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public ListDocuments build() {
+ return genericBuild();
+ }
+
+ @Override
+ ListDocuments buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new ListDocuments(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * RunQueryRequest}{@code >, }{@link PTransform}{@code <}{@link
RunQueryResponse}{@code >>} which
+ * will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#runQuery() runQuery()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
RunQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#runQuery()
+ * @see FirestoreV1.RunQuery.Builder
+ * @see RunQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+ */
+ public static final class RunQuery
+ extends Transform<
+ PCollection<RunQueryRequest>, PCollection<RunQueryResponse>,
RunQuery, RunQuery.Builder> {
+
+ private RunQuery(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<RunQueryResponse> expand(PCollection<RunQueryRequest>
input) {
+ return input
+ .apply(
+ "runQuery",
+ ParDo.of(new RunQueryFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link RunQuery} allowing configuration and
instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#runQuery() runQuery()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#runQuery()
+ * @see FirestoreV1.RunQuery
+ * @see RunQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery">google.firestore.v1.Firestore.RunQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest">google.firestore.v1.RunQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse">google.firestore.v1.RunQueryResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<RunQueryRequest>,
+ PCollection<RunQueryResponse>,
+ RunQuery,
+ RunQuery.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ private Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public RunQuery build() {
+ return genericBuild();
+ }
+
+ @Override
+ RunQuery buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new RunQuery(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * BatchGetDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
+ * BatchGetDocumentsResponse}{@code >>} which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#batchGetDocuments()
+ * @see FirestoreV1.BatchGetDocuments.Builder
+ * @see BatchGetDocumentsRequest
+ * @see BatchGetDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+ */
+ public static final class BatchGetDocuments
+ extends Transform<
+ PCollection<BatchGetDocumentsRequest>,
+ PCollection<BatchGetDocumentsResponse>,
+ BatchGetDocuments,
+ BatchGetDocuments.Builder> {
+
+ private BatchGetDocuments(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public PCollection<BatchGetDocumentsResponse> expand(
+ PCollection<BatchGetDocumentsRequest> input) {
+ return input
+ .apply(
+ "batchGetDocuments",
+ ParDo.of(
+ new BatchGetDocumentsFn(clock,
firestoreStatefulComponentFactory, rpcQosOptions)))
+ .apply(Reshuffle.viaRandomKey());
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+
+ /**
+ * A type safe builder for {@link BatchGetDocuments} allowing
configuration and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#batchGetDocuments()
+ * @see FirestoreV1.BatchGetDocuments
+ * @see BatchGetDocumentsRequest
+ * @see BatchGetDocumentsResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments">google.firestore.v1.Firestore.BatchGetDocuments</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest">google.firestore.v1.BatchGetDocumentsRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse">google.firestore.v1.BatchGetDocumentsResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<BatchGetDocumentsRequest>,
+ PCollection<BatchGetDocumentsResponse>,
+ BatchGetDocuments,
+ BatchGetDocuments.Builder> {
+
+ private Builder() {
+ super();
+ }
+
+ public Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ }
+
+ @Override
+ public BatchGetDocuments build() {
+ return genericBuild();
+ }
+
+ @Override
+ BatchGetDocuments buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new BatchGetDocuments(clock, firestoreStatefulComponentFactory,
rpcQosOptions);
+ }
+ }
+ }
+
+ /**
+ * Concrete class representing a {@link PTransform}{@code <}{@link
PCollection}{@code <}{@link
+ * PartitionQueryRequest}{@code >, }{@link PTransform}{@code <}{@link
RunQueryResponse}{@code >>}
+ * which will read from Firestore.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type safe
builder accessible via
+ * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code
.}{@link
+ * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+ *
+ * <p>All request quality-of-service for an instance of this PTransform is
scoped to the worker
+ * and configured via {@link
PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#partitionQuery()
+ * @see FirestoreV1.PartitionQuery.Builder
+ * @see PartitionQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+ */
+ public static final class PartitionQuery
+ extends Transform<
+ PCollection<PartitionQueryRequest>,
+ PCollection<RunQueryResponse>,
+ PartitionQuery,
+ PartitionQuery.Builder> {
+
+ private final boolean nameOnlyQuery;
+
+ private PartitionQuery(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions,
+ boolean nameOnlyQuery) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ this.nameOnlyQuery = nameOnlyQuery;
+ }
+
+ @Override
+ public PCollection<RunQueryResponse>
expand(PCollection<PartitionQueryRequest> input) {
+ PCollection<RunQueryRequest> queries =
+ input
+ .apply(
+ "PartitionQuery",
+ ParDo.of(
+ new PartitionQueryFn(
+ clock, firestoreStatefulComponentFactory,
rpcQosOptions)))
+ .apply("expand queries", ParDo.of(new
PartitionQueryResponseToRunQueryRequest()));
+ if (nameOnlyQuery) {
+ queries =
+ queries.apply(
+ "set name only query",
+ MapElements.via(
+ new SimpleFunction<RunQueryRequest, RunQueryRequest>() {
+ @Override
+ public RunQueryRequest apply(RunQueryRequest input) {
+ RunQueryRequest.Builder builder = input.toBuilder();
+ builder
+ .getStructuredQueryBuilder()
+ .setSelect(
+ Projection.newBuilder()
+ .addFields(
+ FieldReference.newBuilder()
+ .setFieldPath("__name__")
+ .build())
+ .build());
+ return builder.build();
+ }
+ }));
+ }
+ return queries
+ .apply(Reshuffle.viaRandomKey())
+ .apply(new RunQuery(clock, firestoreStatefulComponentFactory,
rpcQosOptions));
+ }
+
+ @Override
+ public Builder toBuilder() {
+ return new Builder(clock, firestoreStatefulComponentFactory,
rpcQosOptions, nameOnlyQuery);
+ }
+
+ /**
+ * A type safe builder for {@link PartitionQuery} allowing configuration
and instantiation.
+ *
+ * <p>This class is part of the Firestore Connector DSL, it has a type
safe builder accessible
+ * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read()
read()}{@code .}{@link
+ * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+ *
+ * <p>
+ *
+ * @see FirestoreIO#v1()
+ * @see FirestoreV1#read()
+ * @see FirestoreV1.Read#partitionQuery()
+ * @see FirestoreV1.PartitionQuery
+ * @see PartitionQueryRequest
+ * @see RunQueryResponse
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery">google.firestore.v1.Firestore.PartitionQuery</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">google.firestore.v1.PartitionQueryRequest</a>
+ * @see <a target="_blank" rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse">google.firestore.v1.PartitionQueryResponse</a>
+ */
+ public static final class Builder
+ extends Transform.Builder<
+ PCollection<PartitionQueryRequest>,
+ PCollection<RunQueryResponse>,
+ PartitionQuery,
+ FirestoreV1.PartitionQuery.Builder> {
+
+ private boolean nameOnlyQuery = false;
+
+ private Builder() {
+ super();
+ }
+
+ public Builder(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions,
+ boolean nameOnlyQuery) {
+ super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+ this.nameOnlyQuery = nameOnlyQuery;
+ }
+
+ @Override
+ public PartitionQuery build() {
+ return genericBuild();
+ }
+
+ /**
+ * Update produced queries to only retrieve their {@code __name__}
thereby not retrieving any
+ * fields and reducing resource requirements.
+ *
+ * @return this builder
+ */
+ public Builder withNameOnlyQuery() {
+ this.nameOnlyQuery = true;
+ return this;
+ }
+
+ @Override
+ PartitionQuery buildSafe(
+ JodaClock clock,
+ FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+ RpcQosOptions rpcQosOptions) {
+ return new PartitionQuery(
+ clock, firestoreStatefulComponentFactory, rpcQosOptions,
nameOnlyQuery);
+ }
+ }
+
+ /**
+ * DoFn which contains the logic necessary to turn a {@link
PartitionQueryRequest} and {@link
+ * PartitionQueryResponse} pair into {@code N} {@link RunQueryRequest}.
+ */
+ static final class PartitionQueryResponseToRunQueryRequest
+ extends DoFn<PartitionQueryPair, RunQueryRequest> {
+
+ /**
+ * When fetching cursors that span multiple pages it is expected (per <a
+ *
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest">
+ * PartitionQueryRequest.page_token</a>) for the client to sort the
cursors before processing
+ * them to define the sub-queries. So here we're defining a Comparator
which will sort Cursors
+ * by the first reference value present, then comparing the reference
values
+ * lexicographically.
+ */
+ static final Comparator<Cursor> CURSOR_REFERENCE_VALUE_COMPARATOR;
+
+ static {
+ Function<Cursor, Optional<Value>> firstReferenceValue =
+ (Cursor c) ->
+ c.getValuesList().stream()
+ .filter(
+ v -> {
+ String referenceValue = v.getReferenceValue();
+ return referenceValue != null &&
!referenceValue.isEmpty();
+ })
+ .findFirst();
+ Function<String, String[]> stringToPath = (String s) -> s.split("/");
+ // compare references by their path segments rather than as a whole
string to ensure
+ // per path segment comparison is taken into account.
+ Comparator<String[]> pathWiseCompare =
+ (String[] path1, String[] path2) -> {
+ int minLength = Math.min(path1.length, path2.length);
+ for (int i = 0; i < minLength; i++) {
+ String pathSegment1 = path1[i];
+ String pathSegment2 = path2[i];
+ int compare = pathSegment1.compareTo(pathSegment2);
+ if (compare != 0) {
+ return compare;
+ }
+ }
+ if (path1.length == path2.length) {
+ return 0;
+ } else if (minLength == path1.length) {
+ return -1;
+ } else {
+ return 1;
+ }
+ };
+
+ // Sort those cursors which have no firstReferenceValue at the bottom
of the list
+ CURSOR_REFERENCE_VALUE_COMPARATOR =
+ Comparator.comparing(
+ firstReferenceValue,
+ (o1, o2) -> {
+ if (o1.isPresent() && o2.isPresent()) {
+ return pathWiseCompare.compare(
+ stringToPath.apply(o1.get().getReferenceValue()),
+ stringToPath.apply(o2.get().getReferenceValue()));
+ } else if (o1.isPresent()) {
+ return -1;
+ } else {
+ return 1;
+ }
+ });
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ PartitionQueryPair pair = c.element();
Review comment:
Unit test added in addition to existing integration test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]