chamikaramj commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r663224976



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -46,6 +46,22 @@
   @StartBundle
   public abstract void startBundle(DoFn<InT, OutT>.StartBundleContext context) 
throws Exception;
 
+  abstract static class NonWindowAwareDoFn<InT, OutT> extends 
FirestoreDoFn<InT, OutT> {

Review comment:
       Please add a comment describing why we we need this class.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -189,6 +533,725 @@ private Write() {}
     }
   }
 
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * ListCollectionIdsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * ListCollectionIdsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listCollectionIds()
+   * @see FirestoreV1.ListCollectionIds.Builder
+   * @see ListCollectionIdsRequest
+   * @see ListCollectionIdsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds";>google.firestore.v1.Firestore.ListCollectionIds</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest";>google.firestore.v1.ListCollectionIdsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse";>google.firestore.v1.ListCollectionIdsResponse</a>
+   */
+  public static final class ListCollectionIds
+      extends Transform<
+          PCollection<ListCollectionIdsRequest>,
+          PCollection<String>,
+          ListCollectionIds,
+          ListCollectionIds.Builder> {
+
+    private ListCollectionIds(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<ListCollectionIdsRequest> 
input) {
+      return input
+          .apply(
+              "listCollectionIds",
+              ParDo.of(
+                  new ListCollectionIdsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new FlattenListCollectionIdsResponse()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListCollectionIds} allowing 
configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listCollectionIds()
+     * @see FirestoreV1.ListCollectionIds
+     * @see ListCollectionIdsRequest
+     * @see ListCollectionIdsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds";>google.firestore.v1.Firestore.ListCollectionIds</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest";>google.firestore.v1.ListCollectionIdsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse";>google.firestore.v1.ListCollectionIdsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListCollectionIdsRequest>,
+            PCollection<String>,
+            ListCollectionIds,
+            ListCollectionIds.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListCollectionIds build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListCollectionIds buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListCollectionIds(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * ListDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link 
ListDocumentsResponse}{@code
+   * >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#listDocuments() listDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listDocuments()
+   * @see FirestoreV1.ListDocuments.Builder
+   * @see ListDocumentsRequest
+   * @see ListDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments";>google.firestore.v1.Firestore.ListDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>google.firestore.v1.ListDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse";>google.firestore.v1.ListDocumentsResponse</a>
+   */
+  public static final class ListDocuments
+      extends Transform<
+          PCollection<ListDocumentsRequest>,
+          PCollection<Document>,
+          ListDocuments,
+          ListDocuments.Builder> {
+
+    private ListDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<Document> expand(PCollection<ListDocumentsRequest> 
input) {
+      return input
+          .apply(
+              "listDocuments",
+              ParDo.of(
+                  new ListDocumentsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new ListDocumentsResponseToDocument()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListDocuments} allowing configuration 
and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#listDocuments() listDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listDocuments()
+     * @see FirestoreV1.ListDocuments
+     * @see ListDocumentsRequest
+     * @see ListDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments";>google.firestore.v1.Firestore.ListDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>google.firestore.v1.ListDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse";>google.firestore.v1.ListDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListDocumentsRequest>,
+            PCollection<Document>,
+            ListDocuments,
+            ListDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListDocuments(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * RunQueryRequest}{@code >, }{@link PTransform}{@code <}{@link 
RunQueryResponse}{@code >>} which
+   * will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#runQuery() runQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
RunQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#runQuery()
+   * @see FirestoreV1.RunQuery.Builder
+   * @see RunQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery";>google.firestore.v1.Firestore.RunQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest";>google.firestore.v1.RunQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse";>google.firestore.v1.RunQueryResponse</a>
+   */
+  public static final class RunQuery
+      extends Transform<
+          PCollection<RunQueryRequest>, PCollection<RunQueryResponse>, 
RunQuery, RunQuery.Builder> {
+
+    private RunQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<RunQueryResponse> expand(PCollection<RunQueryRequest> 
input) {
+      return input
+          .apply(
+              "runQuery",
+              ParDo.of(new RunQueryFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link RunQuery} allowing configuration and 
instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#runQuery() runQuery()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#runQuery()
+     * @see FirestoreV1.RunQuery
+     * @see RunQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery";>google.firestore.v1.Firestore.RunQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest";>google.firestore.v1.RunQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse";>google.firestore.v1.RunQueryResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<RunQueryRequest>,
+            PCollection<RunQueryResponse>,
+            RunQuery,
+            RunQuery.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public RunQuery build() {
+        return genericBuild();
+      }
+
+      @Override
+      RunQuery buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new RunQuery(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * BatchGetDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * BatchGetDocumentsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#batchGetDocuments()
+   * @see FirestoreV1.BatchGetDocuments.Builder
+   * @see BatchGetDocumentsRequest
+   * @see BatchGetDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments";>google.firestore.v1.Firestore.BatchGetDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest";>google.firestore.v1.BatchGetDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse";>google.firestore.v1.BatchGetDocumentsResponse</a>
+   */
+  public static final class BatchGetDocuments
+      extends Transform<
+          PCollection<BatchGetDocumentsRequest>,
+          PCollection<BatchGetDocumentsResponse>,
+          BatchGetDocuments,
+          BatchGetDocuments.Builder> {
+
+    private BatchGetDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<BatchGetDocumentsResponse> expand(
+        PCollection<BatchGetDocumentsRequest> input) {
+      return input
+          .apply(
+              "batchGetDocuments",
+              ParDo.of(
+                  new BatchGetDocumentsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchGetDocuments} allowing 
configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#batchGetDocuments()
+     * @see FirestoreV1.BatchGetDocuments
+     * @see BatchGetDocumentsRequest
+     * @see BatchGetDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments";>google.firestore.v1.Firestore.BatchGetDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest";>google.firestore.v1.BatchGetDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse";>google.firestore.v1.BatchGetDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<BatchGetDocumentsRequest>,
+            PCollection<BatchGetDocumentsResponse>,
+            BatchGetDocuments,
+            BatchGetDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      public Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchGetDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      BatchGetDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new BatchGetDocuments(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * PartitionQueryRequest}{@code >, }{@link PTransform}{@code <}{@link 
RunQueryResponse}{@code >>}
+   * which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#partitionQuery()
+   * @see FirestoreV1.PartitionQuery.Builder
+   * @see PartitionQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery";>google.firestore.v1.Firestore.PartitionQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest";>google.firestore.v1.PartitionQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse";>google.firestore.v1.PartitionQueryResponse</a>
+   */
+  public static final class PartitionQuery
+      extends Transform<
+          PCollection<PartitionQueryRequest>,
+          PCollection<RunQueryResponse>,
+          PartitionQuery,
+          PartitionQuery.Builder> {
+
+    private final boolean nameOnlyQuery;
+
+    private PartitionQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions,
+        boolean nameOnlyQuery) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      this.nameOnlyQuery = nameOnlyQuery;
+    }
+
+    @Override
+    public PCollection<RunQueryResponse> 
expand(PCollection<PartitionQueryRequest> input) {

Review comment:
       Agree. I would have expected PartitionQuery to be a utility that help 
Beam read transform to parallelize better instead being a part of the public 
API.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -46,6 +46,22 @@
   @StartBundle
   public abstract void startBundle(DoFn<InT, OutT>.StartBundleContext context) 
throws Exception;
 
+  abstract static class NonWindowAwareDoFn<InT, OutT> extends 
FirestoreDoFn<InT, OutT> {
+    /**
+     * {@link ProcessContext#element() context.element()} must be non-null, 
otherwise a
+     * NullPointerException will be thrown.

Review comment:
       I'm not sure if this means whether Firestore Read does not support 
Windowing or not but please note that Windowing is a key feature of Beam and 
all sources are expected to support that.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -59,6 +89,80 @@
  *
  * <h3>Operations</h3>
  *
+ * <h4>Read</h4>
+ *
+ * <p>The currently supported read operations and their execution behavior are 
as follows:
+ *
+ * <table>
+ *   <tbody>
+ *     <tr>
+ *       <th>RPC</th>
+ *       <th>Execution Behavior</th>
+ *     </tr>
+ *     <tr>
+ *       <td>PartitionQuery</td>
+ *       <td>Parallel Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>RunQuery</td>
+ *       <td>Sequential Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>BatchGet</td>
+ *       <td>Sequential Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>ListCollectionIds</td>
+ *       <td>Sequential Paginated</td>
+ *     </tr>
+ *     <tr>
+ *       <td>ListDocuments</td>
+ *       <td>Sequential Paginated</td>
+ *     </tr>
+ *   </tbody>
+ * </table>
+ *
+ * <p>PartitionQuery should be preferred over other options if at all 
possible, becuase it has the
+ * ability to parallelize execution of multiple queries for specific 
sub-ranges of the full results.
+ *
+ * <p>You should only ever use ListDocuments if the use of <a target="_blank" 
rel="noopener
+ * noreferrer"
+ * 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>{@code
+ * show_missing}</a> is needed to access a document. RunQuery and 
PartitionQuery will always be
+ * faster if the use of {@code show_missing} is not needed.
+ *
+ * <p><b>Example Usage</b>
+ *
+ * <pre>{@code
+ * PCollection<PartitionQueryRequest> partitionQueryRequests = ...;

Review comment:
       Probably add short descriptions to each of these request/response types 
(and links for further details).

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.compatqual.NullableDecl;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream 
will attempt to resume
+   * rather than starting over. The restarting of the stream will continue 
within the scope of the
+   * completion of the request (meaning any possibility of resumption is 
contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn

Review comment:
       I would also try to reduce the the implementation to one or few 
SDF-based sources to make implementation simpler.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -189,6 +533,725 @@ private Write() {}
     }
   }
 
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * ListCollectionIdsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * ListCollectionIdsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listCollectionIds()
+   * @see FirestoreV1.ListCollectionIds.Builder
+   * @see ListCollectionIdsRequest
+   * @see ListCollectionIdsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds";>google.firestore.v1.Firestore.ListCollectionIds</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest";>google.firestore.v1.ListCollectionIdsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse";>google.firestore.v1.ListCollectionIdsResponse</a>
+   */
+  public static final class ListCollectionIds
+      extends Transform<
+          PCollection<ListCollectionIdsRequest>,
+          PCollection<String>,
+          ListCollectionIds,
+          ListCollectionIds.Builder> {
+
+    private ListCollectionIds(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<ListCollectionIdsRequest> 
input) {
+      return input
+          .apply(
+              "listCollectionIds",
+              ParDo.of(
+                  new ListCollectionIdsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new FlattenListCollectionIdsResponse()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListCollectionIds} allowing 
configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#listCollectionIds() listCollectionIds()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listCollectionIds()
+     * @see FirestoreV1.ListCollectionIds
+     * @see ListCollectionIdsRequest
+     * @see ListCollectionIdsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListCollectionIds";>google.firestore.v1.Firestore.ListCollectionIds</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsRequest";>google.firestore.v1.ListCollectionIdsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListCollectionIdsResponse";>google.firestore.v1.ListCollectionIdsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListCollectionIdsRequest>,
+            PCollection<String>,
+            ListCollectionIds,
+            ListCollectionIds.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListCollectionIds build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListCollectionIds buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListCollectionIds(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * ListDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link 
ListDocumentsResponse}{@code
+   * >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#listDocuments() listDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#listDocuments()
+   * @see FirestoreV1.ListDocuments.Builder
+   * @see ListDocumentsRequest
+   * @see ListDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments";>google.firestore.v1.Firestore.ListDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>google.firestore.v1.ListDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse";>google.firestore.v1.ListDocumentsResponse</a>
+   */
+  public static final class ListDocuments
+      extends Transform<
+          PCollection<ListDocumentsRequest>,
+          PCollection<Document>,
+          ListDocuments,
+          ListDocuments.Builder> {
+
+    private ListDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<Document> expand(PCollection<ListDocumentsRequest> 
input) {
+      return input
+          .apply(
+              "listDocuments",
+              ParDo.of(
+                  new ListDocumentsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(ParDo.of(new ListDocumentsResponseToDocument()))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link ListDocuments} allowing configuration 
and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#listDocuments() listDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#listDocuments()
+     * @see FirestoreV1.ListDocuments
+     * @see ListDocumentsRequest
+     * @see ListDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.ListDocuments";>google.firestore.v1.Firestore.ListDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>google.firestore.v1.ListDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsResponse";>google.firestore.v1.ListDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<ListDocumentsRequest>,
+            PCollection<Document>,
+            ListDocuments,
+            ListDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public ListDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      ListDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new ListDocuments(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * RunQueryRequest}{@code >, }{@link PTransform}{@code <}{@link 
RunQueryResponse}{@code >>} which
+   * will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#runQuery() runQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
RunQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#runQuery()
+   * @see FirestoreV1.RunQuery.Builder
+   * @see RunQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery";>google.firestore.v1.Firestore.RunQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest";>google.firestore.v1.RunQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse";>google.firestore.v1.RunQueryResponse</a>
+   */
+  public static final class RunQuery
+      extends Transform<
+          PCollection<RunQueryRequest>, PCollection<RunQueryResponse>, 
RunQuery, RunQuery.Builder> {
+
+    private RunQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<RunQueryResponse> expand(PCollection<RunQueryRequest> 
input) {
+      return input
+          .apply(
+              "runQuery",
+              ParDo.of(new RunQueryFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link RunQuery} allowing configuration and 
instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#runQuery() runQuery()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#runQuery()
+     * @see FirestoreV1.RunQuery
+     * @see RunQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.RunQuery";>google.firestore.v1.Firestore.RunQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryRequest";>google.firestore.v1.RunQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.RunQueryResponse";>google.firestore.v1.RunQueryResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<RunQueryRequest>,
+            PCollection<RunQueryResponse>,
+            RunQuery,
+            RunQuery.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public RunQuery build() {
+        return genericBuild();
+      }
+
+      @Override
+      RunQuery buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new RunQuery(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * BatchGetDocumentsRequest}{@code >, }{@link PTransform}{@code <}{@link
+   * BatchGetDocumentsResponse}{@code >>} which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#batchGetDocuments()
+   * @see FirestoreV1.BatchGetDocuments.Builder
+   * @see BatchGetDocumentsRequest
+   * @see BatchGetDocumentsResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments";>google.firestore.v1.Firestore.BatchGetDocuments</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest";>google.firestore.v1.BatchGetDocumentsRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse";>google.firestore.v1.BatchGetDocumentsResponse</a>
+   */
+  public static final class BatchGetDocuments
+      extends Transform<
+          PCollection<BatchGetDocumentsRequest>,
+          PCollection<BatchGetDocumentsResponse>,
+          BatchGetDocuments,
+          BatchGetDocuments.Builder> {
+
+    private BatchGetDocuments(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<BatchGetDocumentsResponse> expand(
+        PCollection<BatchGetDocumentsRequest> input) {
+      return input
+          .apply(
+              "batchGetDocuments",
+              ParDo.of(
+                  new BatchGetDocumentsFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchGetDocuments} allowing 
configuration and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#batchGetDocuments() batchGetDocuments()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#batchGetDocuments()
+     * @see FirestoreV1.BatchGetDocuments
+     * @see BatchGetDocumentsRequest
+     * @see BatchGetDocumentsResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchGetDocuments";>google.firestore.v1.Firestore.BatchGetDocuments</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsRequest";>google.firestore.v1.BatchGetDocumentsRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchGetDocumentsResponse";>google.firestore.v1.BatchGetDocumentsResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<BatchGetDocumentsRequest>,
+            PCollection<BatchGetDocumentsResponse>,
+            BatchGetDocuments,
+            BatchGetDocuments.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      public Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchGetDocuments build() {
+        return genericBuild();
+      }
+
+      @Override
+      BatchGetDocuments buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new BatchGetDocuments(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * PartitionQueryRequest}{@code >, }{@link PTransform}{@code <}{@link 
RunQueryResponse}{@code >>}
+   * which will read from Firestore.
+   *
+   * <p>This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() read()}{@code 
.}{@link
+   * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+   *
+   * <p>All request quality-of-service for an instance of this PTransform is 
scoped to the worker
+   * and configured via {@link 
PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)}.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#read()
+   * @see FirestoreV1.Read#partitionQuery()
+   * @see FirestoreV1.PartitionQuery.Builder
+   * @see PartitionQueryRequest
+   * @see RunQueryResponse
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery";>google.firestore.v1.Firestore.PartitionQuery</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest";>google.firestore.v1.PartitionQueryRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer"
+   *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse";>google.firestore.v1.PartitionQueryResponse</a>
+   */
+  public static final class PartitionQuery
+      extends Transform<
+          PCollection<PartitionQueryRequest>,
+          PCollection<RunQueryResponse>,
+          PartitionQuery,
+          PartitionQuery.Builder> {
+
+    private final boolean nameOnlyQuery;
+
+    private PartitionQuery(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions,
+        boolean nameOnlyQuery) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      this.nameOnlyQuery = nameOnlyQuery;
+    }
+
+    @Override
+    public PCollection<RunQueryResponse> 
expand(PCollection<PartitionQueryRequest> input) {
+      PCollection<RunQueryRequest> queries =
+          input
+              .apply(
+                  "PartitionQuery",
+                  ParDo.of(
+                      new PartitionQueryFn(
+                          clock, firestoreStatefulComponentFactory, 
rpcQosOptions)))
+              .apply("expand queries", ParDo.of(new 
PartitionQueryResponseToRunQueryRequest()));
+      if (nameOnlyQuery) {
+        queries =
+            queries.apply(
+                "set name only query",
+                MapElements.via(
+                    new SimpleFunction<RunQueryRequest, RunQueryRequest>() {
+                      @Override
+                      public RunQueryRequest apply(RunQueryRequest input) {
+                        RunQueryRequest.Builder builder = input.toBuilder();
+                        builder
+                            .getStructuredQueryBuilder()
+                            .setSelect(
+                                Projection.newBuilder()
+                                    .addFields(
+                                        FieldReference.newBuilder()
+                                            .setFieldPath("__name__")
+                                            .build())
+                                    .build());
+                        return builder.build();
+                      }
+                    }));
+      }
+      return queries
+          .apply(Reshuffle.viaRandomKey())
+          .apply(new RunQuery(clock, firestoreStatefulComponentFactory, 
rpcQosOptions));
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions, nameOnlyQuery);
+    }
+
+    /**
+     * A type safe builder for {@link PartitionQuery} allowing configuration 
and instantiation.
+     *
+     * <p>This class is part of the Firestore Connector DSL, it has a type 
safe builder accessible
+     * via {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#read() 
read()}{@code .}{@link
+     * FirestoreV1.Read#partitionQuery() partitionQuery()}.
+     *
+     * <p>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#read()
+     * @see FirestoreV1.Read#partitionQuery()
+     * @see FirestoreV1.PartitionQuery
+     * @see PartitionQueryRequest
+     * @see RunQueryResponse
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.PartitionQuery";>google.firestore.v1.Firestore.PartitionQuery</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest";>google.firestore.v1.PartitionQueryRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer"
+     *     
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryResponse";>google.firestore.v1.PartitionQueryResponse</a>
+     */
+    public static final class Builder
+        extends Transform.Builder<
+            PCollection<PartitionQueryRequest>,
+            PCollection<RunQueryResponse>,
+            PartitionQuery,
+            FirestoreV1.PartitionQuery.Builder> {
+
+      private boolean nameOnlyQuery = false;
+
+      private Builder() {
+        super();
+      }
+
+      public Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions,
+          boolean nameOnlyQuery) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+        this.nameOnlyQuery = nameOnlyQuery;
+      }
+
+      @Override
+      public PartitionQuery build() {
+        return genericBuild();
+      }
+
+      /**
+       * Update produced queries to only retrieve their {@code __name__} 
thereby not retrieving any
+       * fields and reducing resource requirements.
+       *
+       * @return this builder
+       */
+      public Builder withNameOnlyQuery() {
+        this.nameOnlyQuery = true;
+        return this;
+      }
+
+      @Override
+      PartitionQuery buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions) {
+        return new PartitionQuery(
+            clock, firestoreStatefulComponentFactory, rpcQosOptions, 
nameOnlyQuery);
+      }
+    }
+
+    /**
+     * DoFn which contains the logic necessary to turn a {@link 
PartitionQueryRequest} and {@link
+     * PartitionQueryResponse} pair into {@code N} {@link RunQueryRequest}.
+     */
+    static final class PartitionQueryResponseToRunQueryRequest
+        extends DoFn<PartitionQueryPair, RunQueryRequest> {
+
+      /**
+       * When fetching cursors that span multiple pages it is expected (per <a
+       * 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.PartitionQueryRequest";>
+       * PartitionQueryRequest.page_token</a>) for the client to sort the 
cursors before processing
+       * them to define the sub-queries. So here we're defining a Comparator 
which will sort Cursors

Review comment:
       Can such a order change while a Beam pipeline is reading a given dataset 
?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.compatqual.NullableDecl;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {

Review comment:
       Usually "*Fn" notation is used to name DoFn implementations.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.compatqual.NullableDecl;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream 
will attempt to resume
+   * rather than starting over. The restarting of the stream will continue 
within the scope of the
+   * completion of the request (meaning any possibility of resumption is 
contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> 
getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ 
ASC" will be used
+      // Before we can set the cursor to the last document name read, we need 
to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an 
order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                
.setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = 
runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    
.setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be 
aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is 
necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. 
See <a target="_blank"
+   * rel="noopener noreferrer"
+   * 
href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body";>{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> 
{
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = 
rpcQos.newReadAttempt(getRpcAttemptContext());

Review comment:
       Is this a global lock ? How would this adapt when Beam parallelize 
reading across workers ?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import 
com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.compatqual.NullableDecl;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the 
stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents 
an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream 
will attempt to resume
+   * rather than starting over. The restarting of the stream will continue 
within the scope of the
+   * completion of the request (meaning any possibility of resumption is 
contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn

Review comment:
       Beam sources help Beam pipeline parallelize. I think we have to update 
following source Fn classes to use SDF to parallelize better. This help support 
features such as dynamic work rebalancing. Without proper parallelization Beam 
pipelines that use Firestore source could run into stragglers (which is an 
issue many Dataflow customers run into without dynamic work rebalancing is not 
available).
   See here for more details on SDF: 
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -59,6 +89,80 @@
  *
  * <h3>Operations</h3>
  *
+ * <h4>Read</h4>
+ *
+ * <p>The currently supported read operations and their execution behavior are 
as follows:
+ *
+ * <table>
+ *   <tbody>
+ *     <tr>
+ *       <th>RPC</th>
+ *       <th>Execution Behavior</th>
+ *     </tr>
+ *     <tr>
+ *       <td>PartitionQuery</td>
+ *       <td>Parallel Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>RunQuery</td>
+ *       <td>Sequential Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>BatchGet</td>
+ *       <td>Sequential Streaming</td>
+ *     </tr>
+ *     <tr>
+ *       <td>ListCollectionIds</td>
+ *       <td>Sequential Paginated</td>
+ *     </tr>
+ *     <tr>
+ *       <td>ListDocuments</td>
+ *       <td>Sequential Paginated</td>
+ *     </tr>
+ *   </tbody>
+ * </table>
+ *
+ * <p>PartitionQuery should be preferred over other options if at all 
possible, becuase it has the
+ * ability to parallelize execution of multiple queries for specific 
sub-ranges of the full results.
+ *
+ * <p>You should only ever use ListDocuments if the use of <a target="_blank" 
rel="noopener
+ * noreferrer"
+ * 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.ListDocumentsRequest";>{@code
+ * show_missing}</a> is needed to access a document. RunQuery and 
PartitionQuery will always be
+ * faster if the use of {@code show_missing} is not needed.
+ *
+ * <p><b>Example Usage</b>
+ *
+ * <pre>{@code
+ * PCollection<PartitionQueryRequest> partitionQueryRequests = ...;
+ * PCollection<RunQueryResponse> partitionQueryResponses = 
partitionQueryRequests
+ *     .apply(FirestoreIO.v1().read().partitionQuery().build());
+ * }</pre>
+ *
+ * <pre>{@code
+ * PCollection<RunQueryRequest> runQueryRequests = ...;

Review comment:
       Do you think all these types of PCollections should be in the public API 
? Are end users expected to use all of these ?
   
   I'm wondering if we can somehow simplify the public API by allowing users to 
get a certain (root) type of PCollection and providing utility functions to 
convert from there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to