jlara310 commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r600907351



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud 
Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used 
for testing purposes
+ * by providing the host port information vi {@link 
FirestoreIOOptions#setEmulatorHostPort(String)}.

Review comment:
       Typo: "vi" -> "via" or "through"

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud 
Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used 
for testing purposes
+ * by providing the host port information vi {@link 
FirestoreIOOptions#setEmulatorHostPort(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the 
Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link 
PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed 
via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed 
via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported 
write operations
+   * available in the Firestore V1 API accessed through {@link 
com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an 
immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {
+    private static final Write INSTANCE = new Write();
+
+    private Write() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link 
com.google.firestore.v1.Write}
+     * operations.
+     * <p/>
+     * By default, when an error is encountered while trying to write to Cloud 
Firestore a {@link
+     * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+     * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+     * will output any failed write. {@link BatchWriteWithDeadLetterQueue} can 
be used by
+     * including {@link BatchWrite.Builder#withDeadLetterQueue()} when 
constructing the write handler.
+     * <p/>
+     * This method is part of the Firestore Connector DSL and should be 
accessed via {@link
+     * FirestoreIO#v1()}.
+     * <p/>
+     *
+     * <b>Example Usage</b>
+     * <pre>{@code
+     * // default behavior, throw exception if a write fails
+     * PCollection<Write> writes = ...;
+     * PDone sink = writes
+     *     .apply(FirestoreIO.v1().write().batchWrite().build());
+     * }</pre>
+     * <pre>{@code
+     * // alternative behavior, output a WriteFailure if a write fails
+     * PCollection<Write> writes = ...;
+     * PCollection<WriteFailure> writeFailures = writes
+     *     
.apply(FirestoreIO.v1().write().batchWrite().withDeadLetterQueue().build());
+     * }</pre>
+     * <p/>
+     * All request quality-of-service for the built {@link BatchWrite} 
PTransform is scoped to
+     * the worker and configured based on the {@link RpcQosOptions} specified 
via this builder.
+     *
+     * @return A new type safe builder providing configuration for processing 
of {@link
+     * com.google.firestore.v1.Write}s
+     * @see FirestoreIO#v1()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public BatchWrite.Builder batchWrite() {
+      return new BatchWrite.Builder();
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * com.google.firestore.v1.Write}{@code >, }{@link PDone}{@code >} which 
will write to Firestore.
+   * <p/>
+   * If an error is encountered while trying to write to Cloud Firestore a 
{@link
+   * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+   * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+   * will instead output any failed write. {@link 
BatchWriteWithDeadLetterQueue } can be used by
+   * including {@link Builder#withDeadLetterQueue()} when constructing the 
write handler.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is 
scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to 
maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined 
by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWriteWithDeadLetterQueue
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWrite
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PDone,
+      BatchWrite,
+      BatchWrite.Builder
+      > {
+
+    private BatchWrite(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PDone expand(PCollection<com.google.firestore.v1.Write> input) {
+      input.apply("batchWrite", ParDo.of(new DefaultBatchWriteFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchWrite} allowing configuration and 
instantiation.
+     * <p/>
+     * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+     * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+     * FirestoreV1.Write#batchWrite() batchWrite()}.
+     * <p/>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#write()
+     * @see FirestoreV1.Write#batchWrite()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public static final class Builder extends 
Transform.Builder<PCollection<com.google.firestore.v1.Write>, PDone, 
BatchWrite, BatchWrite.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      public BatchWriteWithDeadLetterQueue.Builder withDeadLetterQueue() {
+        return new BatchWriteWithDeadLetterQueue.Builder(clock, 
firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchWrite build() {
+        return genericBuild();
+      }
+
+      @Override
+      protected BatchWrite buildSafe(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        return new BatchWrite(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link com.google.firestore.v1.Write}{@code >, }{@link 
PCollection}{@code <}{@link WriteFailure}{@code >} which will write to 
Firestore.
+   * {@link WriteFailure}s output by this {@code PTransform} are those writes 
which were not able to be applied to Cloud Firestore.
+   * <p/>
+   * Use this BatchWrite when you do not want a failed write to error an 
entire bundle.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}{@code .}{@link 
BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is 
scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to 
maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined 
by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWrite.Builder#withDeadLetterQueue()
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWriteWithDeadLetterQueue
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PCollection<WriteFailure>,
+      BatchWriteWithDeadLetterQueue,
+      BatchWriteWithDeadLetterQueue.Builder
+      > {
+
+    private BatchWriteWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<WriteFailure> 
expand(PCollection<com.google.firestore.v1.Write> input) {
+      return input.apply("batchWrite", ParDo.of(new 
BatchWriteFnWithDeadLetterQueue(clock, firestoreStatefulComponentFactory, 
rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchWriteWithDeadLetterQueue} allowing 
configuration and instantiation.
+     * <p/>
+     * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+     * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+     * FirestoreV1.Write#batchWrite() batchWrite()}.
+     * <p/>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#write()
+     * @see FirestoreV1.Write#batchWrite()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public static final class Builder extends 
Transform.Builder<PCollection<com.google.firestore.v1.Write>, 
PCollection<WriteFailure>, BatchWriteWithDeadLetterQueue, 
BatchWriteWithDeadLetterQueue.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchWriteWithDeadLetterQueue build() {
+        return genericBuild();
+      }
+
+      @Override
+      protected BatchWriteWithDeadLetterQueue buildSafe(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        return new BatchWriteWithDeadLetterQueue(clock, 
firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Failure details for an attempted {@link com.google.firestore.v1.Write}. 
When a {@link
+   * com.google.firestore.v1.Write} is unable to be applied an instance of 
this class will be
+   * created with the details of the failure.
+   * <p/>
+   * Included data:
+   * <ul>
+   *   <li>The original {@link com.google.firestore.v1.Write}</li>
+   *   <li>The {@link WriteResult} returned by the Cloud Firestore API</li>
+   *   <li>
+   *     The {@link Status} returned by the Cloud Firestore API (often {@link 
Status#getMessage()}
+   *     will provide details of why the write was unsuccessful
+   *   </li>
+   * </ul>
+   */
+  public static final class WriteFailure implements Serializable {
+    private final com.google.firestore.v1.Write write;
+    private final WriteResult writeResult;
+    private final Status status;
+
+    public WriteFailure(com.google.firestore.v1.Write write, WriteResult 
writeResult, Status status) {
+      this.write = write;
+      this.writeResult = writeResult;
+      this.status = status;
+    }
+
+    public com.google.firestore.v1.Write getWrite() {
+      return write;
+    }
+
+    public WriteResult getWriteResult() {
+      return writeResult;
+    }
+
+    public Status getStatus() {
+      return status;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof WriteFailure)) {
+        return false;
+      }
+      WriteFailure that = (WriteFailure) o;
+      return write.equals(that.write) &&
+          writeResult.equals(that.writeResult) &&
+          status.equals(that.status);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(write, writeResult, status);
+    }
+  }
+
+  /**
+   * Exception that is thrown if one or more {@link 
com.google.firestore.v1.Write}s is unsuccessful
+   * with a non-retryable status code.
+   */
+  public static class FailedWritesException extends RuntimeException {
+    private final List<WriteFailure> writeFailures;
+
+    public FailedWritesException(List<WriteFailure> writeFailures) {
+      super(String.format("Not-retryable status code(s) for %d writes", 
writeFailures.size()));
+      this.writeFailures = writeFailures;
+    }
+
+    /**
+     * This list of {@link WriteFailure}s detailing which writes failed and 
for what reason.
+     */
+    public List<WriteFailure> getWriteFailures() {
+      return writeFailures;
+    }
+  }
+
+  /**
+   * Our base PTransform class for Firestore V1 API related functions.
+   * <p/>
+   *
+   * @param <In> The type of the previous stage of the pipeline, usually a 
{@link PCollection} of a
+   * request type from {@link com.google.firestore.v1}
+   * @param <Out> The type returned from the RPC operation (usually a response 
class from {@link
+   * com.google.firestore.v1})
+   * @param <Trfm> The type of this transform used to bind this type and the 
corresponding type safe
+   * {@link Bldr} together
+   * @param <Bldr> The type of the type safe builder which is used to build 
and instance of {@link
+   * Trfm}
+   */
+  private static abstract class Transform<
+      In extends PInput,
+      Out extends POutput,
+      Trfm extends Transform<In, Out, Trfm, Bldr>,
+      Bldr extends Transform.Builder<In, Out, Trfm, Bldr>
+      >
+      extends PTransform<In, Out>
+      implements HasDisplayData {
+    protected final JodaClock clock;
+    protected final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    protected final RpcQosOptions rpcQosOptions;
+
+    protected Transform(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    @Override
+    public final void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+    }
+
+    /**
+     * Create a new {@link Bldr Builder} from the current instance.
+     *
+     * @return a new instance of a {@link Bldr Builder} initialized to the 
current state of this
+     * instance
+     */
+    public abstract Bldr toBuilder();
+
+    /**
+     * Our base type safe builder for a {@link FirestoreV1.Transform}
+     * <p/>
+     * This type safe builder provides a user (and semver) friendly way to 
expose optional
+     * parameters to those users that wish to configure them. Additionally, we 
are able to add and
+     * deprecate individual parameters as may be needed.
+     *
+     * @param <In> The type of the previous stage of the pipeline, usually a 
{@link PCollection} of
+     * a request type from {@link com.google.firestore.v1}
+     * @param <Out> The type returned from the RPC operation (usually a 
response class from {@link
+     * com.google.firestore.v1})
+     * @param <Trfm> The type of this transform used to bind this type and the 
corresponding type
+     * safe {@link Bldr} together
+     * @param <Bldr> The type of the type safe builder which is used to build 
and instance of {@link
+     * Trfm}
+     */
+    protected static abstract class Builder<
+        In extends PInput,
+        Out extends POutput,
+        Trfm extends Transform<In, Out, Trfm, Bldr>,
+        Bldr extends Transform.Builder<In, Out, Trfm, Bldr>
+        > {
+      protected JodaClock clock;
+      protected FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+      protected RpcQosOptions rpcQosOptions;
+
+      protected Builder() {
+        clock = JodaClock.DEFAULT;
+        firestoreStatefulComponentFactory = 
FirestoreStatefulComponentFactory.INSTANCE;
+        rpcQosOptions = RpcQosOptions.defaultOptions();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions
+      ) {
+        this.clock = clock;
+        this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+        this.rpcQosOptions = rpcQosOptions;
+      }
+
+      /**
+       * Convenience method to take care of hiding the unchecked cast warning 
from the compiler.
+       * This cast is safe because we are always an instance of {@link Bldr} 
as the only way to
+       * get an instance of {@link FirestoreV1.Transform.Builder} is for it to 
conform to {@code Bldr}'s constraints.
+       * @return Down cast this
+       */
+      @SuppressWarnings({"unchecked", "RedundantSuppression"})
+      private Bldr self() {
+        return (Bldr) this;
+      }
+
+      /**
+       * Create a new instance of {@link Trfm Transform} from the current 
builder state.
+       * @return a new instance of {@link Trfm Transform} from the current 
builder state.
+       */
+      public abstract Trfm build();
+
+      /**
+       * Provide a central location for the validation before ultimately 
constructing a
+       * transformer.
+       *
+       * While this method looks to purely be duplication (given that each 
implementation of {@link
+       * #build()} simply delegates to this method, the build method carries 
with it the concrete

Review comment:
       Missing the closing the parentheses for  "(given"

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create

Review comment:
       https://cloud.google.com/firestore/docs/security/iam#roles might be more 
helpful here.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.rpc.Code;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.joda.time.Instant;
+
+/**
+ * Quality of Service manager for Firestore RPCs.
+ * <p/>
+ * Cloud Firestore has a number of considerations for interacting with the 
database in a reliable
+ * manner.
+ * <p/>
+ * Every RPC which is sent to Cloud Firestore is subject to QoS 
considerations. Successful, failed,
+ * attempted requests are all tracked and directly drive the determination of 
when to attempt an RPC.
+ * In the case of a write rpc, the QoS will also determine the size of the 
request in order to try
+ * and maximize throughput with success rate.
+ * <p/>
+ * The lifecycle of an instance of {@link RpcQos} is expected to be bound to 
the lifetime of the
+ * worker the RPC Functions run on. Explicitly, this instance should live 
longer than an individual
+ * bundle.
+ * <p/>
+ * Each request processed via one of the {@link 
org.apache.beam.sdk.transforms.PTransform}s available
+ * in {@link FirestoreV1} will work its way through a state machine provided 
by this {@link RpcQos}.
+ * The high level state machine events are as follows:
+ * <ol>
+ *   <li>Create new {@link RpcAttempt}</li>
+ *   <li>Check if it is safe to proceed with sending the request ({@link 
RpcAttempt#awaitSafeToProceed(Instant)})</li>
+ *   <li>Record start of trying to send request ({@link 
RpcAttempt#recordStartRequest(Instant)})</li>
+ *   <li>Record success or failure state of send request attempt</li>
+ *   <li>If success output all returned responses</li>
+ *   <li>
+ *     If failure check retry ability ({@link 
RpcAttempt#checkCanRetry(RuntimeException)})
+ *     <ol style="margin-top: 0">
+ *       <li>Ensure the request has budget to retry ({@link 
RpcQosOptions#getMaxAttempts()})</li>
+ *       <li>Ensure the error is not a non-retryable error</li>
+ *     </ol>
+ *   </li>
+ * </ol>
+ * Configuration of options can be accomplished by passing an instances of 
{@link RpcQosOptions}
+ * to the {@code withRpcQosOptions} method of each {@code Builder} available 
in {@link FirestoreV1}.
+ * <p/>
+ * A new instance of {@link RpcQosOptions.Builder} can be created via {@link 
RpcQosOptions#newBuilder()}.
+ * A default instance of {@link RpcQosOptions} can be created via {@link 
RpcQosOptions#defaultOptions()}.
+ * <p/>
+ * @see FirestoreV1
+ * @see FirestoreV1.BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.BatchWrite.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.RunQuery.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see <a href="https://cloud.google.com/firestore/quotas#limits";>Standard 
limits</a>
+ * @see <a 
href="https://cloud.google.com/firestore/docs/best-practices#designing_for_scale";>Designing
 for scale</a>
+ */
+interface RpcQos {
+
+  /**
+   * Create a new stateful attempt for a read operation. The returned {@link 
RpcReadAttempt} will
+   * be used for the full lifetime of trying to successfully process a request.
+   * @param context The {@link Context} which this new attempt should be 
associated with.
+   * @return A new {@link RpcReadAttempt} which will be used while trying to 
successfully a request
+   */
+  RpcReadAttempt newReadAttempt(Context context);
+
+  /**
+   * Create a new stateful attempt for a write operation. The returned {@link 
RpcWriteAttempt} will
+   * be used for the full lifetime of trying to successfully process a request.
+   * @param context The {@link Context} which this new attempt should be 
associated with.
+   * @return A new {@link RpcWriteAttempt} which will be used while trying to 
successfully a request
+   */
+  RpcWriteAttempt newWriteAttempt(Context context);
+
+  /**
+   * Check if a request is over the max allowed number of bytes.
+   *
+   * @param bytes number of bytes to check against the allowed limit
+   * @return true if {@code bytes} is over the allowed limit, false otherwise
+   * @see RpcQosOptions#getBatchMaxBytes()
+   */
+  boolean bytesOverLimit(long bytes);
+
+  /**
+   * Base interface representing the lifespan of attempting to successfully 
process a single request.
+   */
+  interface RpcAttempt {
+
+    /**
+     * Await until it is safe to proceed sending the rpc, evaluated relative 
to {@code start}.
+     * If it is not yet safe to proceed, this method will block until it is 
safe to proceed.
+     * @param start The intended start time of the next rpc
+     * @return true if it is safe to proceed with sending the next rpc, false 
otherwise.
+     * @throws InterruptedException if this thread is interrupted while waiting
+     * @see Thread#sleep(long)
+     * @see org.apache.beam.sdk.util.Sleeper#sleep(long)
+     */
+    boolean awaitSafeToProceed(Instant start) throws InterruptedException;
+
+    /**
+     * Determine if an rpc can be retried given {@code exception}.
+     * <p/>
+     * If a backoff is necessary before retrying this method can block for 
backoff before returning.
+     * <p/>
+     * If no retry is available this {@link RpcAttempt} will move to a 
terminal failed state and
+     * will error if further interaction is attempted.
+     * @param exception Exception to evaluate for retry ability
+     * @throws InterruptedException if this thread is interrupted while waiting
+     */
+    void checkCanRetry(RuntimeException exception) throws InterruptedException;
+
+    /**
+     * Record the time start time of sending the rpc

Review comment:
       "time start time" -> "start time"

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html

Review comment:
       Add a closing parenthesis.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud 
Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used 
for testing purposes
+ * by providing the host port information vi {@link 
FirestoreIOOptions#setEmulatorHostPort(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the 
Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link 
PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed 
via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed 
via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported 
write operations
+   * available in the Firestore V1 API accessed through {@link 
com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an 
immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {
+    private static final Write INSTANCE = new Write();
+
+    private Write() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link 
com.google.firestore.v1.Write}
+     * operations.
+     * <p/>
+     * By default, when an error is encountered while trying to write to Cloud 
Firestore a {@link
+     * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+     * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+     * will output any failed write. {@link BatchWriteWithDeadLetterQueue} can 
be used by
+     * including {@link BatchWrite.Builder#withDeadLetterQueue()} when 
constructing the write handler.
+     * <p/>
+     * This method is part of the Firestore Connector DSL and should be 
accessed via {@link
+     * FirestoreIO#v1()}.
+     * <p/>
+     *
+     * <b>Example Usage</b>
+     * <pre>{@code
+     * // default behavior, throw exception if a write fails
+     * PCollection<Write> writes = ...;
+     * PDone sink = writes
+     *     .apply(FirestoreIO.v1().write().batchWrite().build());
+     * }</pre>
+     * <pre>{@code
+     * // alternative behavior, output a WriteFailure if a write fails
+     * PCollection<Write> writes = ...;
+     * PCollection<WriteFailure> writeFailures = writes
+     *     
.apply(FirestoreIO.v1().write().batchWrite().withDeadLetterQueue().build());
+     * }</pre>
+     * <p/>
+     * All request quality-of-service for the built {@link BatchWrite} 
PTransform is scoped to
+     * the worker and configured based on the {@link RpcQosOptions} specified 
via this builder.
+     *
+     * @return A new type safe builder providing configuration for processing 
of {@link
+     * com.google.firestore.v1.Write}s
+     * @see FirestoreIO#v1()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public BatchWrite.Builder batchWrite() {
+      return new BatchWrite.Builder();
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * com.google.firestore.v1.Write}{@code >, }{@link PDone}{@code >} which 
will write to Firestore.
+   * <p/>
+   * If an error is encountered while trying to write to Cloud Firestore a 
{@link
+   * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+   * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+   * will instead output any failed write. {@link 
BatchWriteWithDeadLetterQueue } can be used by
+   * including {@link Builder#withDeadLetterQueue()} when constructing the 
write handler.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is 
scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to 
maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined 
by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWriteWithDeadLetterQueue
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWrite
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PDone,
+      BatchWrite,
+      BatchWrite.Builder
+      > {
+
+    private BatchWrite(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PDone expand(PCollection<com.google.firestore.v1.Write> input) {
+      input.apply("batchWrite", ParDo.of(new DefaultBatchWriteFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchWrite} allowing configuration and 
instantiation.
+     * <p/>
+     * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+     * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+     * FirestoreV1.Write#batchWrite() batchWrite()}.
+     * <p/>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#write()
+     * @see FirestoreV1.Write#batchWrite()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public static final class Builder extends 
Transform.Builder<PCollection<com.google.firestore.v1.Write>, PDone, 
BatchWrite, BatchWrite.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      public BatchWriteWithDeadLetterQueue.Builder withDeadLetterQueue() {
+        return new BatchWriteWithDeadLetterQueue.Builder(clock, 
firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchWrite build() {
+        return genericBuild();
+      }
+
+      @Override
+      protected BatchWrite buildSafe(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        return new BatchWrite(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link com.google.firestore.v1.Write}{@code >, }{@link 
PCollection}{@code <}{@link WriteFailure}{@code >} which will write to 
Firestore.
+   * {@link WriteFailure}s output by this {@code PTransform} are those writes 
which were not able to be applied to Cloud Firestore.
+   * <p/>
+   * Use this BatchWrite when you do not want a failed write to error an 
entire bundle.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}{@code .}{@link 
BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is 
scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to 
maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined 
by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWrite.Builder#withDeadLetterQueue()
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+   * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWriteWithDeadLetterQueue
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PCollection<WriteFailure>,
+      BatchWriteWithDeadLetterQueue,
+      BatchWriteWithDeadLetterQueue.Builder
+      > {
+
+    private BatchWriteWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PCollection<WriteFailure> 
expand(PCollection<com.google.firestore.v1.Write> input) {
+      return input.apply("batchWrite", ParDo.of(new 
BatchWriteFnWithDeadLetterQueue(clock, firestoreStatefulComponentFactory, 
rpcQosOptions)))
+          .apply(Reshuffle.viaRandomKey());
+    }
+
+    @Override
+    public Builder toBuilder() {
+      return new Builder(clock, firestoreStatefulComponentFactory, 
rpcQosOptions);
+    }
+
+    /**
+     * A type safe builder for {@link BatchWriteWithDeadLetterQueue} allowing 
configuration and instantiation.
+     * <p/>
+     * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+     * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+     * FirestoreV1.Write#batchWrite() batchWrite()}.
+     * <p/>
+     *
+     * @see FirestoreIO#v1()
+     * @see FirestoreV1#write()
+     * @see FirestoreV1.Write#batchWrite()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public static final class Builder extends 
Transform.Builder<PCollection<com.google.firestore.v1.Write>, 
PCollection<WriteFailure>, BatchWriteWithDeadLetterQueue, 
BatchWriteWithDeadLetterQueue.Builder> {
+
+      private Builder() {
+        super();
+      }
+
+      private Builder(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+
+      @Override
+      public BatchWriteWithDeadLetterQueue build() {
+        return genericBuild();
+      }
+
+      @Override
+      protected BatchWriteWithDeadLetterQueue buildSafe(JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+        return new BatchWriteWithDeadLetterQueue(clock, 
firestoreStatefulComponentFactory, rpcQosOptions);
+      }
+    }
+  }
+
+  /**
+   * Failure details for an attempted {@link com.google.firestore.v1.Write}. 
When a {@link
+   * com.google.firestore.v1.Write} is unable to be applied an instance of 
this class will be
+   * created with the details of the failure.
+   * <p/>
+   * Included data:
+   * <ul>
+   *   <li>The original {@link com.google.firestore.v1.Write}</li>
+   *   <li>The {@link WriteResult} returned by the Cloud Firestore API</li>
+   *   <li>
+   *     The {@link Status} returned by the Cloud Firestore API (often {@link 
Status#getMessage()}
+   *     will provide details of why the write was unsuccessful
+   *   </li>
+   * </ul>
+   */
+  public static final class WriteFailure implements Serializable {
+    private final com.google.firestore.v1.Write write;
+    private final WriteResult writeResult;
+    private final Status status;
+
+    public WriteFailure(com.google.firestore.v1.Write write, WriteResult 
writeResult, Status status) {
+      this.write = write;
+      this.writeResult = writeResult;
+      this.status = status;
+    }
+
+    public com.google.firestore.v1.Write getWrite() {
+      return write;
+    }
+
+    public WriteResult getWriteResult() {
+      return writeResult;
+    }
+
+    public Status getStatus() {
+      return status;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof WriteFailure)) {
+        return false;
+      }
+      WriteFailure that = (WriteFailure) o;
+      return write.equals(that.write) &&
+          writeResult.equals(that.writeResult) &&
+          status.equals(that.status);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(write, writeResult, status);
+    }
+  }
+
+  /**
+   * Exception that is thrown if one or more {@link 
com.google.firestore.v1.Write}s is unsuccessful
+   * with a non-retryable status code.
+   */
+  public static class FailedWritesException extends RuntimeException {
+    private final List<WriteFailure> writeFailures;
+
+    public FailedWritesException(List<WriteFailure> writeFailures) {
+      super(String.format("Not-retryable status code(s) for %d writes", 
writeFailures.size()));
+      this.writeFailures = writeFailures;
+    }
+
+    /**
+     * This list of {@link WriteFailure}s detailing which writes failed and 
for what reason.
+     */
+    public List<WriteFailure> getWriteFailures() {
+      return writeFailures;
+    }
+  }
+
+  /**
+   * Our base PTransform class for Firestore V1 API related functions.
+   * <p/>
+   *
+   * @param <In> The type of the previous stage of the pipeline, usually a 
{@link PCollection} of a
+   * request type from {@link com.google.firestore.v1}
+   * @param <Out> The type returned from the RPC operation (usually a response 
class from {@link
+   * com.google.firestore.v1})
+   * @param <Trfm> The type of this transform used to bind this type and the 
corresponding type safe
+   * {@link Bldr} together
+   * @param <Bldr> The type of the type safe builder which is used to build 
and instance of {@link
+   * Trfm}
+   */
+  private static abstract class Transform<
+      In extends PInput,
+      Out extends POutput,
+      Trfm extends Transform<In, Out, Trfm, Bldr>,
+      Bldr extends Transform.Builder<In, Out, Trfm, Bldr>
+      >
+      extends PTransform<In, Out>
+      implements HasDisplayData {
+    protected final JodaClock clock;
+    protected final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    protected final RpcQosOptions rpcQosOptions;
+
+    protected Transform(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    @Override
+    public final void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+    }
+
+    /**
+     * Create a new {@link Bldr Builder} from the current instance.
+     *
+     * @return a new instance of a {@link Bldr Builder} initialized to the 
current state of this
+     * instance
+     */
+    public abstract Bldr toBuilder();
+
+    /**
+     * Our base type safe builder for a {@link FirestoreV1.Transform}
+     * <p/>
+     * This type safe builder provides a user (and semver) friendly way to 
expose optional
+     * parameters to those users that wish to configure them. Additionally, we 
are able to add and
+     * deprecate individual parameters as may be needed.
+     *
+     * @param <In> The type of the previous stage of the pipeline, usually a 
{@link PCollection} of
+     * a request type from {@link com.google.firestore.v1}
+     * @param <Out> The type returned from the RPC operation (usually a 
response class from {@link
+     * com.google.firestore.v1})
+     * @param <Trfm> The type of this transform used to bind this type and the 
corresponding type
+     * safe {@link Bldr} together
+     * @param <Bldr> The type of the type safe builder which is used to build 
and instance of {@link
+     * Trfm}
+     */
+    protected static abstract class Builder<
+        In extends PInput,
+        Out extends POutput,
+        Trfm extends Transform<In, Out, Trfm, Bldr>,
+        Bldr extends Transform.Builder<In, Out, Trfm, Bldr>
+        > {
+      protected JodaClock clock;
+      protected FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+      protected RpcQosOptions rpcQosOptions;
+
+      protected Builder() {
+        clock = JodaClock.DEFAULT;
+        firestoreStatefulComponentFactory = 
FirestoreStatefulComponentFactory.INSTANCE;
+        rpcQosOptions = RpcQosOptions.defaultOptions();
+      }
+
+      private Builder(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions
+      ) {
+        this.clock = clock;
+        this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+        this.rpcQosOptions = rpcQosOptions;
+      }
+
+      /**
+       * Convenience method to take care of hiding the unchecked cast warning 
from the compiler.
+       * This cast is safe because we are always an instance of {@link Bldr} 
as the only way to
+       * get an instance of {@link FirestoreV1.Transform.Builder} is for it to 
conform to {@code Bldr}'s constraints.
+       * @return Down cast this
+       */
+      @SuppressWarnings({"unchecked", "RedundantSuppression"})
+      private Bldr self() {
+        return (Bldr) this;
+      }
+
+      /**
+       * Create a new instance of {@link Trfm Transform} from the current 
builder state.
+       * @return a new instance of {@link Trfm Transform} from the current 
builder state.
+       */
+      public abstract Trfm build();
+
+      /**
+       * Provide a central location for the validation before ultimately 
constructing a
+       * transformer.
+       *
+       * While this method looks to purely be duplication (given that each 
implementation of {@link
+       * #build()} simply delegates to this method, the build method carries 
with it the concrete
+       * class rather than the generic type information. Having the concrete 
class available to
+       * users is advantageous to reduce the necessity of reading the complex 
type information and
+       * instead presenting them with a concrete class name.
+       *
+       * Comparing the type of the builder at the use site of each method:
+       * <table>
+       *   <tr>
+       *     <th>{@code build()}</th>
+       *     <th>{@code genericBuild()}</th>
+       *   </tr>
+       *   <tr>
+       *     <td><pre>{@code 
FirestoreV1.BatchGetDocuments.Builder<In>}</pre></td>
+       *     <td><pre>{@code FirestoreV1.Transform.Builder<
+       *            In extends PInput,
+       *            Out extends POutput,
+       *            Trfm extends FirestoreV1.Transform<In, Out, Trfm, Bldr>,
+       *            Bldr extends FirestoreV1.Transform.Builder<In, Out, Trfm, 
Bldr>
+       *          >}</pre></td>
+       *   </tr>
+       * </table>
+       *
+       * While this type information is important for our implementation, it 
is less important for
+       * the users using our implementation.
+       */
+      protected final Trfm genericBuild() {
+        return buildSafe(
+            requireNonNull(clock, "clock must be non null"),
+            requireNonNull(firestoreStatefulComponentFactory, 
"firestoreFactory must be non null"),
+            requireNonNull(rpcQosOptions, "rpcQosOptions must be non null")
+        );
+      }
+
+      protected abstract Trfm buildSafe(
+          JodaClock clock,
+          FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+          RpcQosOptions rpcQosOptions
+      );
+
+      /**
+       * Specify the {@link RpcQosOptions} that will be used when 
bootstrapping the QOS of each
+       * running instance of the {@link Trfm Transform} created by this 
builder.
+       * <p/>
+       * <i>NOTE</i> This method behaves as set, NOT copy with new value.

Review comment:
       Not sure what's meant by "NOT copy with new value." Should it be: "DO 
NOT copy with a new value."

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQos.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.rpc.Code;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.joda.time.Instant;
+
+/**
+ * Quality of Service manager for Firestore RPCs.
+ * <p/>
+ * Cloud Firestore has a number of considerations for interacting with the 
database in a reliable
+ * manner.
+ * <p/>
+ * Every RPC which is sent to Cloud Firestore is subject to QoS 
considerations. Successful, failed,
+ * attempted requests are all tracked and directly drive the determination of 
when to attempt an RPC.
+ * In the case of a write rpc, the QoS will also determine the size of the 
request in order to try
+ * and maximize throughput with success rate.
+ * <p/>
+ * The lifecycle of an instance of {@link RpcQos} is expected to be bound to 
the lifetime of the
+ * worker the RPC Functions run on. Explicitly, this instance should live 
longer than an individual
+ * bundle.
+ * <p/>
+ * Each request processed via one of the {@link 
org.apache.beam.sdk.transforms.PTransform}s available
+ * in {@link FirestoreV1} will work its way through a state machine provided 
by this {@link RpcQos}.
+ * The high level state machine events are as follows:
+ * <ol>
+ *   <li>Create new {@link RpcAttempt}</li>
+ *   <li>Check if it is safe to proceed with sending the request ({@link 
RpcAttempt#awaitSafeToProceed(Instant)})</li>
+ *   <li>Record start of trying to send request ({@link 
RpcAttempt#recordStartRequest(Instant)})</li>
+ *   <li>Record success or failure state of send request attempt</li>
+ *   <li>If success output all returned responses</li>
+ *   <li>
+ *     If failure check retry ability ({@link 
RpcAttempt#checkCanRetry(RuntimeException)})
+ *     <ol style="margin-top: 0">
+ *       <li>Ensure the request has budget to retry ({@link 
RpcQosOptions#getMaxAttempts()})</li>
+ *       <li>Ensure the error is not a non-retryable error</li>
+ *     </ol>
+ *   </li>
+ * </ol>
+ * Configuration of options can be accomplished by passing an instances of 
{@link RpcQosOptions}
+ * to the {@code withRpcQosOptions} method of each {@code Builder} available 
in {@link FirestoreV1}.
+ * <p/>
+ * A new instance of {@link RpcQosOptions.Builder} can be created via {@link 
RpcQosOptions#newBuilder()}.
+ * A default instance of {@link RpcQosOptions} can be created via {@link 
RpcQosOptions#defaultOptions()}.
+ * <p/>
+ * @see FirestoreV1
+ * @see FirestoreV1.BatchGetDocuments.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.BatchWrite.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.ListCollectionIds.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.ListDocuments.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.PartitionQuery.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see FirestoreV1.RunQuery.Builder#withRpcQosOptions(RpcQosOptions)
+ * @see <a href="https://cloud.google.com/firestore/quotas#limits";>Standard 
limits</a>
+ * @see <a 
href="https://cloud.google.com/firestore/docs/best-practices#designing_for_scale";>Designing
 for scale</a>
+ */
+interface RpcQos {
+
+  /**
+   * Create a new stateful attempt for a read operation. The returned {@link 
RpcReadAttempt} will
+   * be used for the full lifetime of trying to successfully process a request.
+   * @param context The {@link Context} which this new attempt should be 
associated with.
+   * @return A new {@link RpcReadAttempt} which will be used while trying to 
successfully a request
+   */
+  RpcReadAttempt newReadAttempt(Context context);
+
+  /**
+   * Create a new stateful attempt for a write operation. The returned {@link 
RpcWriteAttempt} will
+   * be used for the full lifetime of trying to successfully process a request.
+   * @param context The {@link Context} which this new attempt should be 
associated with.
+   * @return A new {@link RpcWriteAttempt} which will be used while trying to 
successfully a request
+   */
+  RpcWriteAttempt newWriteAttempt(Context context);
+
+  /**
+   * Check if a request is over the max allowed number of bytes.
+   *
+   * @param bytes number of bytes to check against the allowed limit
+   * @return true if {@code bytes} is over the allowed limit, false otherwise
+   * @see RpcQosOptions#getBatchMaxBytes()
+   */
+  boolean bytesOverLimit(long bytes);
+
+  /**
+   * Base interface representing the lifespan of attempting to successfully 
process a single request.
+   */
+  interface RpcAttempt {
+
+    /**
+     * Await until it is safe to proceed sending the rpc, evaluated relative 
to {@code start}.
+     * If it is not yet safe to proceed, this method will block until it is 
safe to proceed.
+     * @param start The intended start time of the next rpc
+     * @return true if it is safe to proceed with sending the next rpc, false 
otherwise.
+     * @throws InterruptedException if this thread is interrupted while waiting
+     * @see Thread#sleep(long)
+     * @see org.apache.beam.sdk.util.Sleeper#sleep(long)
+     */
+    boolean awaitSafeToProceed(Instant start) throws InterruptedException;
+
+    /**
+     * Determine if an rpc can be retried given {@code exception}.
+     * <p/>
+     * If a backoff is necessary before retrying this method can block for 
backoff before returning.
+     * <p/>
+     * If no retry is available this {@link RpcAttempt} will move to a 
terminal failed state and
+     * will error if further interaction is attempted.
+     * @param exception Exception to evaluate for retry ability
+     * @throws InterruptedException if this thread is interrupted while waiting
+     */
+    void checkCanRetry(RuntimeException exception) throws InterruptedException;
+
+    /**
+     * Record the time start time of sending the rpc
+     */
+    void recordStartRequest(Instant instantSinceEpoch);
+
+    /**
+     * Mark this {@link RpcAttempt} as having completed successfully, moving 
to a terminal success
+     * state. If any further interaction is attempted an error will be thrown.
+     */
+    void completeSuccess();
+
+    boolean isCodeRetryable(Code code);
+
+    /**
+     * Context which an attempt should be associated with.
+     * <p/>
+     * Some things which are associated with an attempt:
+     * <ol>
+     *   <li>Log appender</li>
+     *   <li>Metrics</li>
+     * </ol>
+     */
+    interface Context {
+
+      /**
+       * The namespace used for log appender and metrics 
+       * @return the namespace to use
+       */
+      String getNamespace();
+    }
+  }
+
+  /**
+   * Read specific interface for {@link RpcAttempt}.
+   * <p/>
+   * This interface provides those methods which apply to read operations and 
the state tracked
+   * related to read operations.
+   */
+  interface RpcReadAttempt extends RpcAttempt {
+
+    void recordSuccessfulRequest(Instant end);
+
+    void recordFailedRequest(Instant end);
+
+    void recordStreamValue(Instant now);
+  }
+
+  /**
+   * Write specific interface for {@link RpcAttempt}.
+   * <p/>
+   * This interface provides those methods which apply to write operations and 
the state tracked
+   * related to write operations.
+   */
+  interface RpcWriteAttempt extends RpcAttempt {
+
+    /**
+     * Create a new {@link FlushBuffer} that can be sized relative to the QoS 
state and to the provided {@code instant}.
+     * @param instant The intended start time of the next rpc
+     * @param <T> The type which will sent in the request
+     * @param <E> The {@link Element} type which the returned buffer will 
contain
+     * @return a new {@link FlushBuffer} which queued messages can be staged 
to before final flush
+     */
+    <T, E extends Element<T>> FlushBuffer<E> newFlushBuffer(Instant instant);
+
+    void recordSuccessfulRequest(Instant end, int numWrites);
+
+    void recordFailedRequest(Instant end, int numWrites);
+
+    /**
+     * A buffer which is sized related to QoS state and provides a staging 
location for elements
+     * before a request is actually created and sent.
+     * @param <E> The {@link Element} type which will be stored in this 
instance
+     */
+    interface FlushBuffer<E extends Element<?>> extends Iterable<E> {
+      /**
+       * Attempt to add {@code newElement} to this {@link FlushBuffer}
+       * @param newElement The {@link Element} to try and add
+       * @return true if the flush group has capacity for newElement, false 
otherwise
+       */
+      boolean offer(E newElement);
+
+      /**
+       * @return the number of elements that are currently buffered in this 
instance
+       */
+      int getBufferedElementsCount();
+
+      /**
+       * @return the number of bytes that are currently buffered in this 
instance
+       */
+      long getBufferedElementsBytes();
+
+      /**
+       *
+       * @return true if the buffer contains enough {@link Element}s such that 
a flush should happen, false otherwise
+       */
+      boolean isFull();
+    }
+
+    /**
+     * An element which can be added to a {@link FlushBuffer}. This interface 
is mainly a marker for
+     * ensuring an encapsulated lifecycle managed way of determining the 
serialized size of from

Review comment:
       "of from" -> "of" ?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html
+   */
+  private static final double MIN_REQUESTS = 1;
+
+  private final RpcQosOptions options;
+
+  private final AdaptiveThrottler at;
+  private final WriteBatcher wb;
+  private final WriteRampUp writeRampUp;
+  private final FluentBackoff fb;
+
+  private final WeakHashMap<Context, Counters> counters;
+  private final Random random;
+  private final Sleeper sleeper;
+  private final Function<Context, Counters> computeCounters;
+
+  RpcQosImpl(
+      RpcQosOptions options,
+      Random random,
+      Sleeper sleeper,
+      CounterFactory counterFactory
+  ) {
+    this.options = options;
+    this.random = random;
+    this.sleeper = sleeper;
+    at = new AdaptiveThrottler();
+    wb = new WriteBatcher();
+    writeRampUp = new WriteRampUp(
+        Math.max(1, 500 / options.getHintMaxNumWorkers())
+    );
+    fb = FluentBackoff.DEFAULT
+        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
+        .withInitialBackoff(options.getInitialBackoff());
+    counters = new WeakHashMap<>();
+    computeCounters = (Context c) -> Counters.getCounters(counterFactory, c);
+  }
+
+  @Override
+  public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+    return new RpcWriteAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public RpcReadAttemptImpl newReadAttempt(Context context) {
+    return new RpcReadAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public boolean bytesOverLimit(long bytes) {
+    return bytes > options.getBatchMaxBytes();
+  }
+
+  private static MovingFunction createMovingFunction(Duration samplePeriod, 
Duration sampleUpdate) {
+    return new MovingFunction(
+        samplePeriod.getMillis(),
+        sampleUpdate.getMillis(),
+        1 /* numSignificantBuckets */,
+        1 /* numSignificantSamples */,
+        Sum.ofLongs()
+    );
+  }
+
+  interface CounterFactory extends Serializable {
+    CounterFactory DEFAULT = Metrics::counter;
+
+    Counter getCounter(String namespace, String name);
+  }
+
+  private enum AttemptState {
+    Active,
+    Active_Started,
+    Complete_Success,
+    Complete_Error;
+
+    public void checkActive() {
+      switch (this) {
+        case Active:
+        case Active_Started:
+          return;
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Error");
+      }
+    }
+
+    public void checkStarted() {
+      switch (this) {
+        case Active_Started:
+          return;
+        case Active:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Active");
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Error");
+      }
+    }
+  }
+
+  private abstract class BaseRpcAttempt implements RpcAttempt {
+    private final Logger logger;
+    protected final Counters counters;
+    protected final BackOff backoff;
+    protected final Sleeper sleeper;
+
+    protected AttemptState state;
+    protected Instant start;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseRpcAttempt(
+        Context context, Counters counters, BackOff backoff, Sleeper sleeper) {
+      this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos", 
context.getNamespace()));
+      this.counters = counters;
+      this.backoff = backoff;
+      this.sleeper = sleeper;
+      this.state = AttemptState.Active;
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
+      if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
+        logger.info("Delaying request by {}ms", 
shouldThrottleRequest.getMillis());
+        throttleRequest(shouldThrottleRequest);
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public void checkCanRetry(RuntimeException exception)
+        throws InterruptedException {
+      state.checkActive();
+
+      Optional<ApiException> findApiException = findApiException(exception);
+
+      if (findApiException.isPresent()) {
+        ApiException apiException = findApiException.get();
+        // order here is semi-important
+        // First we always want to test if the error code is one of the codes 
we have deemed
+        // non-retryable before delegating to the exceptions default set.
+        if (
+            maxAttemptsExhausted()
+                || 
getStatusCodeNumber(apiException).map(NON_RETRYABLE_ERROR_NUMBERS::contains).orElse(false)
+                || !apiException.isRetryable()
+        ) {
+          state = AttemptState.Complete_Error;
+          throw apiException;
+        }
+      } else {
+        state = AttemptState.Complete_Error;
+        throw exception;
+      }
+    }
+
+    @Override
+    public void recordStartRequest(Instant instantSinceEpoch) {
+      at.recordStartRequest(instantSinceEpoch);
+      start = instantSinceEpoch;
+      state = AttemptState.Active_Started;
+    }
+
+    @Override
+    public void completeSuccess() {
+      state.checkActive();
+      state = AttemptState.Complete_Success;
+    }
+
+    @Override
+    public boolean isCodeRetryable(Code code) {
+      return !NON_RETRYABLE_ERROR_NUMBERS.contains(code.getNumber());
+    }
+
+    private boolean maxAttemptsExhausted() throws InterruptedException {
+      try {
+        boolean exhausted = !BackOffUtils.next(sleeper, backoff);
+        if (exhausted) {
+          logger.error("Max attempts exhausted after {} attempts.", 
options.getMaxAttempts());
+        }
+        return exhausted;
+      } catch (IOException e) {
+        // We are using FluentBackoff which does not ever throw an IOException 
from its methods
+        // Catch and wrap any potential IOException as a RuntimeException 
since it won't ever
+        // happen unless the implementation of FluentBackoff changes.
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected Logger getLogger() {
+      return logger;
+    }
+
+    protected final void throttleRequest(Duration shouldThrottleRequest) 
throws InterruptedException {
+      counters.throttlingMs.inc(shouldThrottleRequest.getMillis());
+      sleeper.sleep(shouldThrottleRequest.getMillis());
+    }
+
+    private Optional<Integer> getStatusCodeNumber(ApiException apiException) {
+      StatusCode statusCode = apiException.getStatusCode();
+      if (statusCode instanceof GrpcStatusCode) {
+        GrpcStatusCode grpcStatusCode = (GrpcStatusCode) statusCode;
+        return Optional.of(grpcStatusCode.getTransportCode().value());
+      }
+      return Optional.empty();
+    }
+
+    private Optional<ApiException> findApiException(Throwable throwable) {
+      if (throwable instanceof ApiException) {
+        ApiException apiException = (ApiException) throwable;
+        return Optional.of(apiException);
+      } else {
+        Throwable cause = throwable.getCause();
+        if (cause != null) {
+          return findApiException(cause);
+        } else {
+          return Optional.empty();
+        }
+      }
+    }
+  }
+
+  private final class RpcReadAttemptImpl extends BaseRpcAttempt implements 
RpcReadAttempt {
+    private RpcReadAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      at.recordSuccessfulRequest(start);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      at.recordFailedRequest(start);
+    }
+
+    @Override
+    public void recordStreamValue(Instant now) {
+      state.checkActive();
+      counters.rpcStreamValueReceived.inc();
+    }
+  }
+
+  final class RpcWriteAttemptImpl extends BaseRpcAttempt implements 
RpcWriteAttempt {
+
+    private RpcWriteAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
+      if (shouldThrottle.isPresent()) {
+        Duration throttleDuration = shouldThrottle.get();
+        getLogger().debug("Still ramping up, Delaying request by {}ms", 
throttleDuration.getMillis());
+        throttleRequest(throttleDuration);
+        return false;
+      } else {
+        return super.awaitSafeToProceed(instant);
+      }
+    }
+
+    @Override
+    public <T, E extends Element<T>> FlushBufferImpl<T, E> 
newFlushBuffer(Instant instantSinceEpoch) {
+      state.checkActive();
+      int availableWriteCountBudget = 
writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
+      int nextBatchMaxCount = wb.nextBatchMaxCount(instantSinceEpoch);
+      int batchMaxCount = Ints.min(
+          Math.max(0, availableWriteCountBudget),
+          Math.max(0, nextBatchMaxCount),
+          options.getBatchMaxCount()
+      );
+      return new FlushBufferImpl<>(
+          batchMaxCount,
+          options.getBatchMaxBytes()
+      );
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordSuccessfulRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordFailedRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+  }
+
+  /**
+   * Determines batch sizes for commit RPCs based on past performance.
+   *
+   * <p>It aims for a target response time per RPC: it uses the response times 
for previous RPCs and
+   * the number of entities contained in them, calculates a rolling average 
time-per-document, and

Review comment:
       Here and in the next line: "entities" -> "documents"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to