danthev commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r596279719



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import edu.umd.cs.findbugs.annotations.NonNull;

Review comment:
       I think the right import here would from 
[checkerframework](https://checkerframework.org/api/org/checkerframework/checker/nullness/qual/NonNull.html).

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * Base class for all {@link DoFn} defined in the Firestore Connector.
+ * <p/>
+ * This class defines all of the lifecycle events as abstract methods, 
ensuring each is accounted
+ * for in any implementing function.
+ * @param <In> The type of the previous stage of the pipeline
+ * @param <Out> The type output to the next stage of the pipeline
+ */
+abstract class FirestoreDoFn<In, Out> extends DoFn<In, Out> {
+
+  @Override
+  public abstract void populateDisplayData(@NonNull DisplayData.Builder 
builder);
+
+  /**
+   * @see org.apache.beam.sdk.transforms.DoFn.Setup
+   */
+  @Setup
+  public abstract void setup() throws Exception;
+
+  /**
+   * @see org.apache.beam.sdk.transforms.DoFn.StartBundle
+   */
+  @StartBundle
+  public abstract void startBundle(DoFn<In, Out>.StartBundleContext context) 
throws Exception;
+
+  static abstract class NonWindowAwareDoFn<In, Out> extends FirestoreDoFn<In, 
Out> {

Review comment:
       I don't see this being used anywhere, I assume that's coming in a 
successive PR?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import edu.umd.cs.findbugs.annotations.Nullable;

Review comment:
       I think this should also come from checkerframework.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.WindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.FailedWritesException;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BackOffUtils;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported write
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1WriteFn {
+
+  static final class DefaultBatchWriteFn extends BaseBatchWriteFn<Void> {
+    DefaultBatchWriteFn(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<Void> context, Instant 
timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      throw new FailedWritesException(writeFailures.stream().map(w -> 
w.failure).collect(Collectors.toList()));
+    }
+  }
+
+  static final class BatchWriteFnWithDeadLetterQueue extends 
BaseBatchWriteFn<WriteFailure> {
+    BatchWriteFnWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<WriteFailure> context, 
Instant timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      logMessage.run();
+      for (WriteFailureDetails details : writeFailures) {
+        context.output(details.failure, timestamp, details.window);
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
+   * <p/>
+   * Writes will be enqueued to be sent at a potentially
+   * later time when more writes are available. This Fn attempts to maximize 
throughput while
+   * maintaining a high request success rate.
+   * <p/>
+   * All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, 
Out> implements
+      HasRpcAttemptContext {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+    private final JodaClock clock;
+    private final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    private final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible 
checkpointing
+    private transient FirestoreRpc firestoreRpc;
+    private transient RpcQos rpcQos;
+    private transient String projectId;
+    @VisibleForTesting
+    transient Queue<@NonNull WriteElement> writes = new 
PriorityQueue<>(WriteElement.COMPARATOR);
+    @VisibleForTesting
+    transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseBatchWriteFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    protected Logger getLogger() {
+      return LOGGER;
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+    }
+
+    @Override
+    public final void 
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull 
DisplayData.Builder builder) {
+      builder
+          .include("rpcQosOptions", rpcQosOptions);
+    }
+
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+      writes = new PriorityQueue<>(WriteElement.COMPARATOR);
+    }
+
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = 
c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId = requireNonNull(project, "project must be defined on 
GcpOptions of PipelineOptions");
+      firestoreRpc = 
firestoreStatefulComponentFactory.getFirestoreRpc(c.getPipelineOptions());
+    }
+
+    /**
+     * For each element extract and enqueue all writes from the commit. Then 
potentially flush any
+     * previously and currently enqueued writes.
+     * <p/>
+     * In order for writes to be enqueued the value of {@link 
BatchWriteRequest#getDatabase()} must
+     * match exactly with the database name this instance is configured for 
via the provided {@link
+     * org.apache.beam.sdk.options.PipelineOptions PipelineOptions}
+     * <p/>
+     * {@inheritDoc}
+     */
+    @Override
+    public void processElement(ProcessContext context, BoundedWindow window) 
throws Exception {
+      @SuppressWarnings("nullness") // for some reason requireNonNull thinks 
its parameter must be non-null...
+      Write write = requireNonNull(context.element(), "context.element() must 
be non null");
+      ProcessContextAdapter<Out> contextAdapter = new 
ProcessContextAdapter<>(context);
+      int serializedSize = write.getSerializedSize();
+      boolean tooLarge = rpcQos.bytesOverLimit(serializedSize);
+      if (tooLarge) {
+        String message = String.format(
+            "%s for document '%s' larger than configured max allowed bytes per 
batch",
+            getWriteType(write),
+            getName(write)
+        );
+        handleWriteFailures(contextAdapter, clock.instant(),
+            ImmutableList.of(new WriteFailureDetails(
+                new WriteFailure(
+                    write,
+                    WriteResult.newBuilder().build(),
+                    Status.newBuilder()
+                        .setCode(Code.INVALID_ARGUMENT.getNumber())
+                        .setMessage(message)
+                        .build()
+                ),
+                window
+            )),
+            () -> LOGGER.info(message)
+        );
+      } else {
+        writes.offer(new WriteElement(queueNextEntryPriority++, write, 
window));
+        flushBatch(/* finalFlush */ false, contextAdapter);
+      }
+    }
+
+    /**
+     * Attempt to flush any outstanding enqueued writes before cleaning up any 
bundle related state.
+     * {@inheritDoc}
+     */
+    @SuppressWarnings("nullness") // allow clearing transient fields
+    @Override
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      try {
+        flushBatch(/* finalFlush */ true, new 
FinishBundleContextAdapter<>(context));
+      } finally {
+        projectId = null;
+        firestoreRpc.close();
+      }
+    }
+
+    /**
+     * Possibly flush enqueued writes to Firestore.
+     * <p/>
+     * This flush attempts to maximize throughput and success rate of RPCs. 
When a flush should
+     * happen and how many writes are included is determined and managed by 
the {@link RpcQos}
+     * instance of this class.
+     *
+     * @param finalFlush A boolean specifying if this call is from {@link 
#finishBundle(DoFn.FinishBundleContext)}. If

Review comment:
       The boolean itself just affects the behavior, it happens to be needed 
when the call is from FinishBundleContext. I would explain it that way, makes 
it a little clearer. And perhaps there is a more descriptive name than 
`finalFlush`, can't really think of one though.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html
+   */
+  private static final double MIN_REQUESTS = 1;
+
+  private final RpcQosOptions options;
+
+  private final AdaptiveThrottler at;
+  private final WriteBatcher wb;
+  private final WriteRampUp writeRampUp;
+  private final FluentBackoff fb;
+
+  private final WeakHashMap<Context, Counters> counters;
+  private final Random random;
+  private final Sleeper sleeper;
+  private final Function<Context, Counters> computeCounters;
+
+  RpcQosImpl(
+      RpcQosOptions options,
+      Random random,
+      Sleeper sleeper,
+      CounterFactory counterFactory
+  ) {
+    this.options = options;
+    this.random = random;
+    this.sleeper = sleeper;
+    at = new AdaptiveThrottler();
+    wb = new WriteBatcher();
+    writeRampUp = new WriteRampUp(
+        Math.max(1, 500 / options.getHintMaxNumWorkers())
+    );
+    fb = FluentBackoff.DEFAULT
+        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
+        .withInitialBackoff(options.getInitialBackoff());
+    counters = new WeakHashMap<>();
+    computeCounters = (Context c) -> Counters.getCounters(counterFactory, c);
+  }
+
+  @Override
+  public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+    return new RpcWriteAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public RpcReadAttemptImpl newReadAttempt(Context context) {
+    return new RpcReadAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public boolean bytesOverLimit(long bytes) {
+    return bytes > options.getBatchMaxBytes();
+  }
+
+  private static MovingFunction createMovingFunction(Duration samplePeriod, 
Duration sampleUpdate) {
+    return new MovingFunction(
+        samplePeriod.getMillis(),
+        sampleUpdate.getMillis(),
+        1 /* numSignificantBuckets */,
+        1 /* numSignificantSamples */,
+        Sum.ofLongs()
+    );
+  }
+
+  interface CounterFactory extends Serializable {
+    CounterFactory DEFAULT = Metrics::counter;
+
+    Counter getCounter(String namespace, String name);
+  }
+
+  private enum AttemptState {
+    Active,
+    Active_Started,
+    Complete_Success,
+    Complete_Error;
+
+    public void checkActive() {
+      switch (this) {
+        case Active:
+        case Active_Started:
+          return;
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Error");
+      }
+    }
+
+    public void checkStarted() {
+      switch (this) {
+        case Active_Started:
+          return;
+        case Active:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Active");
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Error");
+      }
+    }
+  }
+
+  private abstract class BaseRpcAttempt implements RpcAttempt {
+    private final Logger logger;
+    protected final Counters counters;
+    protected final BackOff backoff;
+    protected final Sleeper sleeper;
+
+    protected AttemptState state;
+    protected Instant start;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseRpcAttempt(
+        Context context, Counters counters, BackOff backoff, Sleeper sleeper) {
+      this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos", 
context.getNamespace()));
+      this.counters = counters;
+      this.backoff = backoff;
+      this.sleeper = sleeper;
+      this.state = AttemptState.Active;
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
+      if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
+        logger.info("Delaying request by {}ms", 
shouldThrottleRequest.getMillis());
+        throttleRequest(shouldThrottleRequest);
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public void checkCanRetry(RuntimeException exception)
+        throws InterruptedException {
+      state.checkActive();
+
+      Optional<ApiException> findApiException = findApiException(exception);
+
+      if (findApiException.isPresent()) {
+        ApiException apiException = findApiException.get();
+        // order here is semi-important
+        // First we always want to test if the error code is one of the codes 
we have deemed
+        // non-retryable before delegating to the exceptions default set.
+        if (
+            maxAttemptsExhausted()
+                || 
getStatusCodeNumber(apiException).map(NON_RETRYABLE_ERROR_NUMBERS::contains).orElse(false)
+                || !apiException.isRetryable()
+        ) {
+          state = AttemptState.Complete_Error;
+          throw apiException;
+        }
+      } else {
+        state = AttemptState.Complete_Error;
+        throw exception;
+      }
+    }
+
+    @Override
+    public void recordStartRequest(Instant instantSinceEpoch) {
+      at.recordStartRequest(instantSinceEpoch);
+      start = instantSinceEpoch;
+      state = AttemptState.Active_Started;

Review comment:
       Based on this, it seems like `Active` is closer to `Pending`, and 
`Active_Started` is the real Active state? I would rename this.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html
+   */
+  private static final double MIN_REQUESTS = 1;
+
+  private final RpcQosOptions options;
+
+  private final AdaptiveThrottler at;
+  private final WriteBatcher wb;
+  private final WriteRampUp writeRampUp;
+  private final FluentBackoff fb;
+
+  private final WeakHashMap<Context, Counters> counters;
+  private final Random random;
+  private final Sleeper sleeper;
+  private final Function<Context, Counters> computeCounters;
+
+  RpcQosImpl(
+      RpcQosOptions options,
+      Random random,
+      Sleeper sleeper,
+      CounterFactory counterFactory
+  ) {
+    this.options = options;
+    this.random = random;
+    this.sleeper = sleeper;
+    at = new AdaptiveThrottler();
+    wb = new WriteBatcher();
+    writeRampUp = new WriteRampUp(
+        Math.max(1, 500 / options.getHintMaxNumWorkers())
+    );
+    fb = FluentBackoff.DEFAULT
+        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
+        .withInitialBackoff(options.getInitialBackoff());
+    counters = new WeakHashMap<>();
+    computeCounters = (Context c) -> Counters.getCounters(counterFactory, c);
+  }
+
+  @Override
+  public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+    return new RpcWriteAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public RpcReadAttemptImpl newReadAttempt(Context context) {
+    return new RpcReadAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public boolean bytesOverLimit(long bytes) {
+    return bytes > options.getBatchMaxBytes();
+  }
+
+  private static MovingFunction createMovingFunction(Duration samplePeriod, 
Duration sampleUpdate) {
+    return new MovingFunction(
+        samplePeriod.getMillis(),
+        sampleUpdate.getMillis(),
+        1 /* numSignificantBuckets */,
+        1 /* numSignificantSamples */,
+        Sum.ofLongs()
+    );
+  }
+
+  interface CounterFactory extends Serializable {
+    CounterFactory DEFAULT = Metrics::counter;
+
+    Counter getCounter(String namespace, String name);
+  }
+
+  private enum AttemptState {
+    Active,
+    Active_Started,
+    Complete_Success,
+    Complete_Error;
+
+    public void checkActive() {
+      switch (this) {
+        case Active:
+        case Active_Started:
+          return;
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Error");
+      }
+    }
+
+    public void checkStarted() {
+      switch (this) {
+        case Active_Started:
+          return;
+        case Active:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Active");
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Error");
+      }
+    }
+  }
+
+  private abstract class BaseRpcAttempt implements RpcAttempt {
+    private final Logger logger;
+    protected final Counters counters;
+    protected final BackOff backoff;
+    protected final Sleeper sleeper;
+
+    protected AttemptState state;
+    protected Instant start;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseRpcAttempt(
+        Context context, Counters counters, BackOff backoff, Sleeper sleeper) {
+      this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos", 
context.getNamespace()));
+      this.counters = counters;
+      this.backoff = backoff;
+      this.sleeper = sleeper;
+      this.state = AttemptState.Active;
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
+      if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
+        logger.info("Delaying request by {}ms", 
shouldThrottleRequest.getMillis());
+        throttleRequest(shouldThrottleRequest);
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public void checkCanRetry(RuntimeException exception)
+        throws InterruptedException {
+      state.checkActive();
+
+      Optional<ApiException> findApiException = findApiException(exception);
+
+      if (findApiException.isPresent()) {
+        ApiException apiException = findApiException.get();
+        // order here is semi-important
+        // First we always want to test if the error code is one of the codes 
we have deemed
+        // non-retryable before delegating to the exceptions default set.
+        if (
+            maxAttemptsExhausted()
+                || 
getStatusCodeNumber(apiException).map(NON_RETRYABLE_ERROR_NUMBERS::contains).orElse(false)
+                || !apiException.isRetryable()
+        ) {
+          state = AttemptState.Complete_Error;
+          throw apiException;
+        }
+      } else {
+        state = AttemptState.Complete_Error;
+        throw exception;
+      }
+    }
+
+    @Override
+    public void recordStartRequest(Instant instantSinceEpoch) {
+      at.recordStartRequest(instantSinceEpoch);
+      start = instantSinceEpoch;
+      state = AttemptState.Active_Started;
+    }
+
+    @Override
+    public void completeSuccess() {
+      state.checkActive();
+      state = AttemptState.Complete_Success;
+    }
+
+    @Override
+    public boolean isCodeRetryable(Code code) {
+      return !NON_RETRYABLE_ERROR_NUMBERS.contains(code.getNumber());
+    }
+
+    private boolean maxAttemptsExhausted() throws InterruptedException {
+      try {
+        boolean exhausted = !BackOffUtils.next(sleeper, backoff);
+        if (exhausted) {
+          logger.error("Max attempts exhausted after {} attempts.", 
options.getMaxAttempts());
+        }
+        return exhausted;
+      } catch (IOException e) {
+        // We are using FluentBackoff which does not ever throw an IOException 
from its methods
+        // Catch and wrap any potential IOException as a RuntimeException 
since it won't ever
+        // happen unless the implementation of FluentBackoff changes.
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected Logger getLogger() {
+      return logger;
+    }
+
+    protected final void throttleRequest(Duration shouldThrottleRequest) 
throws InterruptedException {
+      counters.throttlingMs.inc(shouldThrottleRequest.getMillis());
+      sleeper.sleep(shouldThrottleRequest.getMillis());
+    }
+
+    private Optional<Integer> getStatusCodeNumber(ApiException apiException) {
+      StatusCode statusCode = apiException.getStatusCode();
+      if (statusCode instanceof GrpcStatusCode) {
+        GrpcStatusCode grpcStatusCode = (GrpcStatusCode) statusCode;
+        return Optional.of(grpcStatusCode.getTransportCode().value());
+      }
+      return Optional.empty();
+    }
+
+    private Optional<ApiException> findApiException(Throwable throwable) {
+      if (throwable instanceof ApiException) {
+        ApiException apiException = (ApiException) throwable;
+        return Optional.of(apiException);
+      } else {
+        Throwable cause = throwable.getCause();
+        if (cause != null) {
+          return findApiException(cause);
+        } else {
+          return Optional.empty();
+        }
+      }
+    }
+  }
+
+  private final class RpcReadAttemptImpl extends BaseRpcAttempt implements 
RpcReadAttempt {
+    private RpcReadAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      at.recordSuccessfulRequest(start);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      at.recordFailedRequest(start);
+    }
+
+    @Override
+    public void recordStreamValue(Instant now) {
+      state.checkActive();
+      counters.rpcStreamValueReceived.inc();
+    }
+  }
+
+  final class RpcWriteAttemptImpl extends BaseRpcAttempt implements 
RpcWriteAttempt {
+
+    private RpcWriteAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
+      if (shouldThrottle.isPresent()) {
+        Duration throttleDuration = shouldThrottle.get();
+        getLogger().debug("Still ramping up, Delaying request by {}ms", 
throttleDuration.getMillis());
+        throttleRequest(throttleDuration);
+        return false;
+      } else {
+        return super.awaitSafeToProceed(instant);
+      }
+    }
+
+    @Override
+    public <T, E extends Element<T>> FlushBufferImpl<T, E> 
newFlushBuffer(Instant instantSinceEpoch) {
+      state.checkActive();
+      int availableWriteCountBudget = 
writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
+      int nextBatchMaxCount = wb.nextBatchMaxCount(instantSinceEpoch);
+      int batchMaxCount = Ints.min(
+          Math.max(0, availableWriteCountBudget),
+          Math.max(0, nextBatchMaxCount),
+          options.getBatchMaxCount()
+      );
+      return new FlushBufferImpl<>(
+          batchMaxCount,
+          options.getBatchMaxBytes()
+      );
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordSuccessfulRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordFailedRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+  }
+
+  /**
+   * Determines batch sizes for commit RPCs based on past performance.
+   *
+   * <p>It aims for a target response time per RPC: it uses the response times 
for previous RPCs and
+   * the number of entities contained in them, calculates a rolling average 
time-per-document, and
+   * chooses the number of entities for future writes to hit the target time.
+   *
+   * <p>This enables us to send large batches without sending over-large 
requests in the case of
+   * expensive document writes that may timeout before the server can apply 
them all.
+   */
+  private final class WriteBatcher {
+
+    private final MovingAverage meanLatencyPerDocumentMs;
+
+    private WriteBatcher() {
+      this.meanLatencyPerDocumentMs =
+          new MovingAverage(
+              options.getSamplePeriod(),
+              options.getSamplePeriodBucketSize()
+              /* numSignificantBuckets */
+              /* numSignificantSamples */
+          );
+    }
+
+    private void recordRequestLatency(Instant start, Instant end, int 
numWrites) {
+      Interval interval = new Interval(start, end);
+      long msPerWrite = numWrites == 0 ? 0 : interval.toDurationMillis() / 
numWrites;
+      meanLatencyPerDocumentMs.add(end, msPerWrite);
+    }
+
+    private int nextBatchMaxCount(Instant instantSinceEpoch) {
+      if (!meanLatencyPerDocumentMs.hasValue(instantSinceEpoch)) {
+        return options.getBatchInitialCount();
+      }
+      long recentMeanLatency = 
Math.max(meanLatencyPerDocumentMs.get(instantSinceEpoch), 1);
+      long nextBatchMaxCount =  options.getBatchTargetLatency().getMillis() /  
recentMeanLatency;
+      return Math.toIntExact(nextBatchMaxCount);
+    }
+
+  }
+
+  /**
+   * An implementation of client-side adaptive throttling. See
+   * 
https://sre.google/sre-book/handling-overload/#client-side-throttling-a7sYUg
+   * for a full discussion of the use case and algorithm applied.
+   */
+  private final class AdaptiveThrottler {
+    private final MovingFunction successfulRequestsMovingFunction;
+    private final MovingFunction failedRequestsMovingFunction;
+    private final MovingFunction allRequestsMovingFunction;
+
+    private AdaptiveThrottler() {
+      allRequestsMovingFunction = 
createMovingFunction(options.getSamplePeriod(), 
options.getSamplePeriodBucketSize());
+      successfulRequestsMovingFunction = 
createMovingFunction(options.getSamplePeriod(), 
options.getSamplePeriodBucketSize());
+      failedRequestsMovingFunction = 
createMovingFunction(options.getSamplePeriod(), 
options.getSamplePeriodBucketSize());
+    }
+
+    private Duration shouldThrottleRequest(Instant instantSinceEpoch) {
+      double delayProbability = throttlingProbability(instantSinceEpoch);
+
+      return (random.nextDouble() < delayProbability) ? 
options.getThrottleDuration() : Duration.ZERO;
+    }
+
+    private void recordStartRequest(Instant instantSinceEpoch) {
+      allRequestsMovingFunction.add(instantSinceEpoch.getMillis(), 1);
+    }
+
+    private void recordSuccessfulRequest(Instant instantSinceEpoch) {
+      successfulRequestsMovingFunction.add(instantSinceEpoch.getMillis(), 1);
+    }
+
+    private void recordFailedRequest(Instant instantSinceEpoch) {
+      failedRequestsMovingFunction.add(instantSinceEpoch.getMillis(), 1);
+    }
+
+    /**
+     * Implementation of the formula from 
https://sre.google/sre-book/handling-overload/#eq2101
+     */
+    private double throttlingProbability(Instant instantSinceEpoch) {
+      if (!allRequestsMovingFunction.isSignificant()) {
+        return 0;
+      }
+      long nowMsSinceEpoch = instantSinceEpoch.getMillis();
+      long allRequestsCount = allRequestsMovingFunction.get(nowMsSinceEpoch);
+      long successfulRequestsCount = 
successfulRequestsMovingFunction.get(nowMsSinceEpoch);
+
+      double overloadMaxCount = options.getOverloadRatio() * 
successfulRequestsCount;
+      double overloadUsage = allRequestsCount - overloadMaxCount;
+
+      double calcProbability = overloadUsage / (allRequestsCount + 
MIN_REQUESTS);
+      return Math.max(0, calcProbability);
+    }
+  }
+
+  /**
+   * An implementation providing the 500/50/5 ramp up strategy recommended by 
<a
+   * 
href="https://cloud.google.com/firestore/docs/best-practices#ramping_up_traffic";>Ramping
 up
+   * traffic</a>
+   */
+  @VisibleForTesting
+  static final class WriteRampUp {
+    private static final Duration RAMP_UP_INTERVAL = 
Duration.standardMinutes(5);
+    private final int baseMax;

Review comment:
       `initialWriteBudget` or `baseBatchBudget`?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreIOOptions.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.cloud.firestore.FirestoreOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Description("Options used to configure Cloud Firestore IO")
+public interface FirestoreIOOptions extends PipelineOptions {
+
+  /**
+   * A host port pair to allow connecting to a Cloud Firestore emulator 
instead of the live service.
+   * The value passed to this method will take precedent if the {@code 
FIRESTORE_EMULATOR_HOST}
+   * environment variable is also set.
+   *
+   * @return the string representation of a host and port pair to be used when 
constructing Cloud
+   * Firestore clients.
+   * @see FirestoreOptions.Builder#setEmulatorHost(java.lang.String)
+   */
+  @Nullable
+  String getEmulatorHostPort();
+
+  /**
+   * Define a host port pair to allow connecting to a Cloud Firestore emulator 
instead of the live
+   * service. The value passed to this method will take precedent if the {@code
+   * FIRESTORE_EMULATOR_HOST} environment variable is also set.
+   *
+   * @param hostPort the emulator host and port to connect to
+   * @see FirestoreOptions.Builder#setEmulatorHost(java.lang.String)
+   */
+  void setEmulatorHostPort(String hostPort);

Review comment:
       I think just calling it host is less confusing, the env variable 
includes a port too despite being named host.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.firestore.FirestoreOptions;
+import com.google.cloud.firestore.FirestoreOptions.EmulatorCredentials;
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.security.SecureRandom;
+import java.util.Map;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.CounterFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.Sleeper;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Factory class for all stateful components used in the Firestore Connector.
+ * <p/>
+ * None of the components returned by any of these factory methods are 
serializable, this factory
+ * functions to give a serialization friendly handle to create instances of 
these components.
+ * <p/>
+ * This class is stateless.
+ */
+@Immutable
+class FirestoreStatefulComponentFactory implements Serializable {
+
+  static final FirestoreStatefulComponentFactory INSTANCE = new 
FirestoreStatefulComponentFactory();
+
+  private FirestoreStatefulComponentFactory() {
+  }
+
+  /**
+   * Given a {@link PipelineOptions}, return a pre-configured {@link 
FirestoreOptions.Builder} with
+   * values set based on those options.
+   * <p/>
+   * The provided {@link PipelineOptions} is expected to provide {@link 
FirestoreIOOptions} and
+   * {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions GcpOptions} 
if connecting to live
+   * Firestore (i.e. not an emulator).
+   * <p/>
+   * The instance returned by this method is expected to bind to the lifecycle 
of a bundle.
+   *
+   * @param options The instance of options to read from
+   * @return a new {@link FirestoreOptions.Builder} pre-configured with values 
from the provided
+   * options
+   */
+  FirestoreOptions.Builder getFirestoreOptionsBuilder(PipelineOptions options) 
{
+    FirestoreOptions.Builder builder = 
FirestoreOptions.getDefaultInstance().toBuilder()
+        .setProjectId(options.as(GcpOptions.class).getProject())
+        .setHeaderProvider(new FixedHeaderProvider() {
+          @Override
+          public Map<@NonNull String, @NonNull String> getHeaders() {
+            String version = "x.y.z";  // TODO: How does beam manage version 
detection in project?
+            return ImmutableMap.of("User-Agent", String.format("beam-sdk/%s", 
version));

Review comment:
       Left over TODO in here. I found this 
[UserAgentFactory](https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L369)
 searching through Beam.
   
   That would be a very useful feature as a matter of fact, I think I will 
backport this to the Datastore connector.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud 
Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used 
for testing purposes
+ * by providing the host port information vi {@link 
FirestoreIOOptions#setEmulatorHostPort(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the 
Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link 
PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed 
via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed 
via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported 
write operations
+   * available in the Firestore V1 API accessed through {@link 
com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an 
immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {
+    private static final Write INSTANCE = new Write();
+
+    private Write() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link 
com.google.firestore.v1.Write}
+     * operations.
+     * <p/>
+     * By default, when an error is encountered while trying to write to Cloud 
Firestore a {@link
+     * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+     * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+     * will output any failed write. {@link BatchWriteWithDeadLetterQueue} can 
be used by
+     * including {@link BatchWrite.Builder#withDeadLetterQueue()} when 
constructing the write handler.
+     * <p/>
+     * This method is part of the Firestore Connector DSL and should be 
accessed via {@link
+     * FirestoreIO#v1()}.
+     * <p/>
+     *
+     * <b>Example Usage</b>

Review comment:
       I would move these examples into the Javadoc at the top of the class, 
similar to [the Datastore 
doc](https://github.com/apache/beam/blob/e8f9c682ae707e91a3937dd4222d0669d54017af/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L102).
 

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html
+   */
+  private static final double MIN_REQUESTS = 1;
+
+  private final RpcQosOptions options;
+
+  private final AdaptiveThrottler at;
+  private final WriteBatcher wb;
+  private final WriteRampUp writeRampUp;
+  private final FluentBackoff fb;
+
+  private final WeakHashMap<Context, Counters> counters;
+  private final Random random;
+  private final Sleeper sleeper;
+  private final Function<Context, Counters> computeCounters;
+
+  RpcQosImpl(
+      RpcQosOptions options,
+      Random random,
+      Sleeper sleeper,
+      CounterFactory counterFactory
+  ) {
+    this.options = options;
+    this.random = random;
+    this.sleeper = sleeper;
+    at = new AdaptiveThrottler();
+    wb = new WriteBatcher();
+    writeRampUp = new WriteRampUp(
+        Math.max(1, 500 / options.getHintMaxNumWorkers())
+    );
+    fb = FluentBackoff.DEFAULT
+        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
+        .withInitialBackoff(options.getInitialBackoff());
+    counters = new WeakHashMap<>();
+    computeCounters = (Context c) -> Counters.getCounters(counterFactory, c);
+  }
+
+  @Override
+  public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+    return new RpcWriteAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public RpcReadAttemptImpl newReadAttempt(Context context) {
+    return new RpcReadAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public boolean bytesOverLimit(long bytes) {
+    return bytes > options.getBatchMaxBytes();
+  }
+
+  private static MovingFunction createMovingFunction(Duration samplePeriod, 
Duration sampleUpdate) {
+    return new MovingFunction(
+        samplePeriod.getMillis(),
+        sampleUpdate.getMillis(),
+        1 /* numSignificantBuckets */,
+        1 /* numSignificantSamples */,
+        Sum.ofLongs()
+    );
+  }
+
+  interface CounterFactory extends Serializable {
+    CounterFactory DEFAULT = Metrics::counter;
+
+    Counter getCounter(String namespace, String name);
+  }
+
+  private enum AttemptState {
+    Active,

Review comment:
       Java readability guidance is enums in all caps IIRC.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.WindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.FailedWritesException;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BackOffUtils;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported write
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1WriteFn {
+
+  static final class DefaultBatchWriteFn extends BaseBatchWriteFn<Void> {
+    DefaultBatchWriteFn(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<Void> context, Instant 
timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      throw new FailedWritesException(writeFailures.stream().map(w -> 
w.failure).collect(Collectors.toList()));
+    }
+  }
+
+  static final class BatchWriteFnWithDeadLetterQueue extends 
BaseBatchWriteFn<WriteFailure> {
+    BatchWriteFnWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<WriteFailure> context, 
Instant timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      logMessage.run();
+      for (WriteFailureDetails details : writeFailures) {
+        context.output(details.failure, timestamp, details.window);
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
+   * <p/>
+   * Writes will be enqueued to be sent at a potentially
+   * later time when more writes are available. This Fn attempts to maximize 
throughput while
+   * maintaining a high request success rate.
+   * <p/>
+   * All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, 
Out> implements
+      HasRpcAttemptContext {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+    private final JodaClock clock;
+    private final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    private final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible 
checkpointing
+    private transient FirestoreRpc firestoreRpc;
+    private transient RpcQos rpcQos;
+    private transient String projectId;
+    @VisibleForTesting
+    transient Queue<@NonNull WriteElement> writes = new 
PriorityQueue<>(WriteElement.COMPARATOR);
+    @VisibleForTesting
+    transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseBatchWriteFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    protected Logger getLogger() {
+      return LOGGER;
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+    }
+
+    @Override
+    public final void 
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull 
DisplayData.Builder builder) {
+      builder
+          .include("rpcQosOptions", rpcQosOptions);
+    }
+
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+      writes = new PriorityQueue<>(WriteElement.COMPARATOR);
+    }
+
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = 
c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId = requireNonNull(project, "project must be defined on 
GcpOptions of PipelineOptions");
+      firestoreRpc = 
firestoreStatefulComponentFactory.getFirestoreRpc(c.getPipelineOptions());
+    }
+
+    /**
+     * For each element extract and enqueue all writes from the commit. Then 
potentially flush any
+     * previously and currently enqueued writes.
+     * <p/>
+     * In order for writes to be enqueued the value of {@link 
BatchWriteRequest#getDatabase()} must
+     * match exactly with the database name this instance is configured for 
via the provided {@link
+     * org.apache.beam.sdk.options.PipelineOptions PipelineOptions}
+     * <p/>
+     * {@inheritDoc}
+     */
+    @Override
+    public void processElement(ProcessContext context, BoundedWindow window) 
throws Exception {
+      @SuppressWarnings("nullness") // for some reason requireNonNull thinks 
its parameter must be non-null...

Review comment:
       That seems weird. Perhaps context could be null and throw a NPE before 
element is checked?

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
##########
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.ApiExceptionFactory;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.WriteElement;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.FlushBufferImpl;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings("initialization.fields.uninitialized") // mockito fields are 
initialized via the Mockito Runner
+abstract class BaseFirestoreV1WriteFnTest<Out, Fn extends 
BaseBatchWriteFn<Out> & HasRpcAttemptContext> extends 
BaseFirestoreV1FnTest<Write, Out, Fn> {
+
+  protected static final Status STATUS_OK = 
Status.newBuilder().setCode(Code.OK.getNumber()).build();
+  @Mock(lenient = true)
+  protected BoundedWindow window;
+  @Mock
+  protected DoFn<Write, Out>.FinishBundleContext finishBundleContext;
+  @Mock
+  protected UnaryCallable<BatchWriteRequest, BatchWriteResponse> callable;
+  @Mock
+  protected RpcQos.RpcWriteAttempt attempt;
+  @Mock
+  protected RpcQos.RpcWriteAttempt attempt2;
+
+  @Before
+  public final void setUp() {
+    when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);
+
+    when(ff.getRpcQos(any()))
+        .thenReturn(rpcQos);
+    when(ff.getFirestoreRpc(pipelineOptions))
+        .thenReturn(rpc);
+    when(rpc.batchWriteCallable())
+        .thenReturn(callable);
+  }
+
+  @Override
+  @Test
+  public final void attemptsExhaustedForRetryableError() throws Exception {
+    Instant attemptStart = Instant.ofEpochMilli(0);
+    Instant rpc1Sta = Instant.ofEpochMilli(1);

Review comment:
       I think we can add two more letters to make it `rpc1Start`. :)

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.WindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.FailedWritesException;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BackOffUtils;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported write
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1WriteFn {
+
+  static final class DefaultBatchWriteFn extends BaseBatchWriteFn<Void> {
+    DefaultBatchWriteFn(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<Void> context, Instant 
timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      throw new FailedWritesException(writeFailures.stream().map(w -> 
w.failure).collect(Collectors.toList()));
+    }
+  }
+
+  static final class BatchWriteFnWithDeadLetterQueue extends 
BaseBatchWriteFn<WriteFailure> {
+    BatchWriteFnWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<WriteFailure> context, 
Instant timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      logMessage.run();
+      for (WriteFailureDetails details : writeFailures) {
+        context.output(details.failure, timestamp, details.window);
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
+   * <p/>
+   * Writes will be enqueued to be sent at a potentially
+   * later time when more writes are available. This Fn attempts to maximize 
throughput while
+   * maintaining a high request success rate.
+   * <p/>
+   * All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, 
Out> implements
+      HasRpcAttemptContext {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+    private final JodaClock clock;
+    private final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    private final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible 
checkpointing
+    private transient FirestoreRpc firestoreRpc;
+    private transient RpcQos rpcQos;
+    private transient String projectId;
+    @VisibleForTesting
+    transient Queue<@NonNull WriteElement> writes = new 
PriorityQueue<>(WriteElement.COMPARATOR);
+    @VisibleForTesting
+    transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseBatchWriteFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    protected Logger getLogger() {
+      return LOGGER;
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+    }
+
+    @Override
+    public final void 
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull 
DisplayData.Builder builder) {
+      builder
+          .include("rpcQosOptions", rpcQosOptions);
+    }
+
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+      writes = new PriorityQueue<>(WriteElement.COMPARATOR);
+    }
+
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = 
c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId = requireNonNull(project, "project must be defined on 
GcpOptions of PipelineOptions");
+      firestoreRpc = 
firestoreStatefulComponentFactory.getFirestoreRpc(c.getPipelineOptions());
+    }
+
+    /**
+     * For each element extract and enqueue all writes from the commit. Then 
potentially flush any
+     * previously and currently enqueued writes.
+     * <p/>
+     * In order for writes to be enqueued the value of {@link 
BatchWriteRequest#getDatabase()} must
+     * match exactly with the database name this instance is configured for 
via the provided {@link
+     * org.apache.beam.sdk.options.PipelineOptions PipelineOptions}
+     * <p/>
+     * {@inheritDoc}
+     */
+    @Override
+    public void processElement(ProcessContext context, BoundedWindow window) 
throws Exception {
+      @SuppressWarnings("nullness") // for some reason requireNonNull thinks 
its parameter must be non-null...
+      Write write = requireNonNull(context.element(), "context.element() must 
be non null");
+      ProcessContextAdapter<Out> contextAdapter = new 
ProcessContextAdapter<>(context);
+      int serializedSize = write.getSerializedSize();
+      boolean tooLarge = rpcQos.bytesOverLimit(serializedSize);
+      if (tooLarge) {
+        String message = String.format(
+            "%s for document '%s' larger than configured max allowed bytes per 
batch",
+            getWriteType(write),
+            getName(write)
+        );
+        handleWriteFailures(contextAdapter, clock.instant(),
+            ImmutableList.of(new WriteFailureDetails(
+                new WriteFailure(
+                    write,
+                    WriteResult.newBuilder().build(),
+                    Status.newBuilder()
+                        .setCode(Code.INVALID_ARGUMENT.getNumber())
+                        .setMessage(message)
+                        .build()
+                ),
+                window
+            )),
+            () -> LOGGER.info(message)
+        );
+      } else {
+        writes.offer(new WriteElement(queueNextEntryPriority++, write, 
window));
+        flushBatch(/* finalFlush */ false, contextAdapter);
+      }
+    }
+
+    /**
+     * Attempt to flush any outstanding enqueued writes before cleaning up any 
bundle related state.
+     * {@inheritDoc}
+     */
+    @SuppressWarnings("nullness") // allow clearing transient fields
+    @Override
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      try {
+        flushBatch(/* finalFlush */ true, new 
FinishBundleContextAdapter<>(context));
+      } finally {
+        projectId = null;
+        firestoreRpc.close();
+      }
+    }
+
+    /**
+     * Possibly flush enqueued writes to Firestore.
+     * <p/>
+     * This flush attempts to maximize throughput and success rate of RPCs. 
When a flush should
+     * happen and how many writes are included is determined and managed by 
the {@link RpcQos}
+     * instance of this class.
+     *
+     * @param finalFlush A boolean specifying if this call is from {@link 
#finishBundle(DoFn.FinishBundleContext)}. If
+     * {@code true}, this method will not return until a terminal state 
(success, attempts
+     * exhausted) for all enqueued writes is reached.
+     * @throws InterruptedException If the current thread is interrupted at 
anytime, such as while
+     * waiting for the next attempt
+     * @see RpcQos
+     * @see RpcQos.RpcWriteAttempt
+     * @see BackOffUtils#next(org.apache.beam.sdk.util.Sleeper, 
org.apache.beam.sdk.util.BackOff)
+     */
+    private void flushBatch(boolean finalFlush, ContextAdapter<Out> context) 
throws InterruptedException {
+      while (!writes.isEmpty()) {
+        RpcWriteAttempt attempt = 
rpcQos.newWriteAttempt(getRpcAttemptContext());
+        Instant begin = clock.instant();
+        if (!attempt.awaitSafeToProceed(begin)) {
+          continue;
+        }
+
+        FlushBuffer<WriteElement> flushBuffer = getFlushBuffer(attempt, begin);
+        if (flushBuffer.isFull() || finalFlush) {
+          doFlush(attempt, flushBuffer, context);
+        } else {
+          // since we're not going to perform a flush, we need to return the 
writes that were
+          // preemptively removed
+          flushBuffer.forEach(writes::offer);
+          // we're not on the final flush, so yield until more elements are 
delivered or
+          // finalFlush is true
+          return;
+        }
+      }
+      // now that the queue has been emptied reset our priority back to 0 to 
try and ensure
+      // we won't run into overflow issues if a worker runs for a long time 
and processes
+      // many writes.
+      queueNextEntryPriority = 0;
+    }
+
+    private FlushBuffer<WriteElement> getFlushBuffer(RpcWriteAttempt attempt, 
Instant start) {
+      FlushBuffer<WriteElement> buffer = attempt.newFlushBuffer(start);
+
+      WriteElement peek;
+      while ((peek = writes.peek()) != null) {
+        if (buffer.offer(peek)) {
+          writes.poll();
+        } else {
+          break;
+        }
+      }
+      return buffer;
+    }
+
+    private BatchWriteRequest getBatchWriteRequest(FlushBuffer<WriteElement> 
flushBuffer) {
+      BatchWriteRequest.Builder commitBuilder = BatchWriteRequest.newBuilder()
+          .setDatabase(getDatabaseName());
+      for (WriteElement element : flushBuffer) {
+        commitBuilder.addWrites(element.getValue());
+      }
+      return commitBuilder.build();
+    }
+
+    private void doFlush(
+        RpcWriteAttempt attempt,
+        FlushBuffer<WriteElement> flushBuffer,
+        ContextAdapter<Out> context
+    ) throws InterruptedException {
+      int writesCount = flushBuffer.getBufferedElementsCount();
+      long bytes = flushBuffer.getBufferedElementsBytes();
+      BatchWriteRequest request = getBatchWriteRequest(flushBuffer);
+      while (true) {

Review comment:
       I would add a one-line comment here that this only loops in case of a 
`continue` because of a retry.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosOptions.java
##########
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import edu.umd.cs.findbugs.annotations.Nullable;

Review comment:
       Wrong import.

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java
##########
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.ApiExceptionFactory;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BaseBatchWriteFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.WriteElement;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQosImpl.FlushBufferImpl;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+@SuppressWarnings("initialization.fields.uninitialized") // mockito fields are 
initialized via the Mockito Runner
+abstract class BaseFirestoreV1WriteFnTest<Out, Fn extends 
BaseBatchWriteFn<Out> & HasRpcAttemptContext> extends 
BaseFirestoreV1FnTest<Write, Out, Fn> {
+
+  protected static final Status STATUS_OK = 
Status.newBuilder().setCode(Code.OK.getNumber()).build();
+  @Mock(lenient = true)
+  protected BoundedWindow window;
+  @Mock
+  protected DoFn<Write, Out>.FinishBundleContext finishBundleContext;
+  @Mock
+  protected UnaryCallable<BatchWriteRequest, BatchWriteResponse> callable;
+  @Mock
+  protected RpcQos.RpcWriteAttempt attempt;
+  @Mock
+  protected RpcQos.RpcWriteAttempt attempt2;
+
+  @Before
+  public final void setUp() {
+    when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);
+
+    when(ff.getRpcQos(any()))
+        .thenReturn(rpcQos);
+    when(ff.getFirestoreRpc(pipelineOptions))
+        .thenReturn(rpc);
+    when(rpc.batchWriteCallable())
+        .thenReturn(callable);
+  }
+
+  @Override
+  @Test
+  public final void attemptsExhaustedForRetryableError() throws Exception {
+    Instant attemptStart = Instant.ofEpochMilli(0);
+    Instant rpc1Sta = Instant.ofEpochMilli(1);
+    Instant rpc1End = Instant.ofEpochMilli(2);
+    Instant rpc2Sta = Instant.ofEpochMilli(3);
+    Instant rpc2End = Instant.ofEpochMilli(4);
+    Instant rpc3Sta = Instant.ofEpochMilli(5);
+    Instant rpc3End = Instant.ofEpochMilli(6);
+    Write write = FirestoreProtoHelpers.newWrite();
+    Element<Write> element1 = new WriteElement(0, write, window);
+
+    when(ff.getFirestoreRpc(any())).thenReturn(rpc);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    
when(rpcQos.newWriteAttempt(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite)).thenReturn(attempt);
+    when(rpc.batchWriteCallable()).thenReturn(callable);
+
+    FlushBuffer<Element<Write>> flushBuffer = 
spy(newFlushBuffer(RPC_QOS_OPTIONS));
+    when(attempt.awaitSafeToProceed(any())).thenReturn(true);
+    when(attempt.<Write, 
Element<Write>>newFlushBuffer(attemptStart)).thenReturn(flushBuffer);
+    when(flushBuffer.offer(element1)).thenReturn(true);
+    when(flushBuffer.iterator()).thenReturn(newArrayList(element1).iterator());
+    when(flushBuffer.getBufferedElementsCount()).thenReturn(1);
+    when(flushBuffer.isFull()).thenReturn(true);
+
+    when(callable.call(any())).thenThrow(RETRYABLE_ERROR, RETRYABLE_ERROR, 
RETRYABLE_ERROR);
+    doNothing().when(attempt).recordFailedRequest(any(), anyInt());
+    
doNothing().doNothing().doThrow(RETRYABLE_ERROR).when(attempt).checkCanRetry(RETRYABLE_ERROR);
+
+    when(processContext.element()).thenReturn(write);
+
+    try {
+      runFunction(getFn(clock, ff, RPC_QOS_OPTIONS));
+      fail("Expected ApiException to be throw after exhausted attempts");
+    } catch (ApiException e) {
+      assertSame(RETRYABLE_ERROR, e);
+    }
+
+    verify(attempt, times(1)).awaitSafeToProceed(attemptStart);
+    verify(attempt, times(1)).recordStartRequest(rpc1Sta);
+    verify(attempt, times(1)).recordFailedRequest(rpc1End, 1);
+    verify(attempt, times(1)).recordStartRequest(rpc2Sta);
+    verify(attempt, times(1)).recordFailedRequest(rpc2End, 1);
+    verify(attempt, times(1)).recordStartRequest(rpc3Sta);
+    verify(attempt, times(1)).recordFailedRequest(rpc3End, 1);
+    verify(attempt, times(0)).recordSuccessfulRequest(any(), anyInt());
+    verify(attempt, never()).completeSuccess();
+  }
+
+  @Override
+  @Test
+  public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
+    when(ff.getFirestoreRpc(any())).thenReturn(rpc);
+    when(ff.getRpcQos(any())).thenReturn(rpcQos);
+    
when(rpcQos.newWriteAttempt(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite)).thenReturn(attempt);
+
+    InterruptedException interruptedException = new InterruptedException();
+    
when(attempt.awaitSafeToProceed(any())).thenReturn(false).thenThrow(interruptedException);
+
+    
when(processContext.element()).thenReturn(FirestoreProtoHelpers.newWrite());
+
+    try {
+      runFunction(getFn(clock, ff, RPC_QOS_OPTIONS));
+      fail("Expected ApiException to be throw after exhausted attempts");
+    } catch (InterruptedException e) {
+      assertSame(interruptedException, e);
+    }
+
+    verify(rpc, times(1)).close();
+    verifyNoMoreInteractions(rpc);
+    verifyNoMoreInteractions(callable);
+    verify(attempt, times(0)).recordFailedRequest(any(), anyInt());
+    verify(attempt, times(0)).recordSuccessfulRequest(any(), anyInt());
+  }
+
+  @Test
+  public abstract void enqueueingWritesValidateBytesSize() throws Exception;
+
+  @Test
+  public final void endToEnd_success() throws Exception {
+
+    Write write = FirestoreProtoHelpers.newWrite();
+    BatchWriteRequest expectedRequest = BatchWriteRequest.newBuilder()
+        .setDatabase("projects/testing-project/databases/(default)")

Review comment:
       Does this need to be parametrized on `ENV_GOOGLE_CLOUD_PROJECT`? Same on 
the other test cases. 

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+  /**
+   * Non-retryable errors. See 
https://cloud.google.com/apis/design/errors#handling_errors.
+   */
+  private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+      ImmutableSet.of(
+          Code.ALREADY_EXISTS,
+          Code.DATA_LOSS,
+          Code.FAILED_PRECONDITION,
+          Code.INVALID_ARGUMENT,
+          Code.OUT_OF_RANGE,
+          Code.NOT_FOUND,
+          Code.PERMISSION_DENIED,
+          Code.UNIMPLEMENTED
+      ).stream()
+          .map(Code::getNumber)
+          .collect(ImmutableSet.toImmutableSet());
+  /**
+   * The target minimum number of requests per samplePeriodMs, even if no 
requests succeed. Must be
+   * greater than 0, else we could throttle to zero. Because every decision is 
probabilistic, there
+   * is no guarantee that the request rate in any given interval will not be 
zero. (This is the +1
+   * from the formula in 
https://landing.google.com/sre/book/chapters/handling-overload.html
+   */
+  private static final double MIN_REQUESTS = 1;
+
+  private final RpcQosOptions options;
+
+  private final AdaptiveThrottler at;
+  private final WriteBatcher wb;
+  private final WriteRampUp writeRampUp;
+  private final FluentBackoff fb;
+
+  private final WeakHashMap<Context, Counters> counters;
+  private final Random random;
+  private final Sleeper sleeper;
+  private final Function<Context, Counters> computeCounters;
+
+  RpcQosImpl(
+      RpcQosOptions options,
+      Random random,
+      Sleeper sleeper,
+      CounterFactory counterFactory
+  ) {
+    this.options = options;
+    this.random = random;
+    this.sleeper = sleeper;
+    at = new AdaptiveThrottler();
+    wb = new WriteBatcher();
+    writeRampUp = new WriteRampUp(
+        Math.max(1, 500 / options.getHintMaxNumWorkers())
+    );
+    fb = FluentBackoff.DEFAULT
+        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
+        .withInitialBackoff(options.getInitialBackoff());
+    counters = new WeakHashMap<>();
+    computeCounters = (Context c) -> Counters.getCounters(counterFactory, c);
+  }
+
+  @Override
+  public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+    return new RpcWriteAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public RpcReadAttemptImpl newReadAttempt(Context context) {
+    return new RpcReadAttemptImpl(
+        context,
+        counters.computeIfAbsent(context, computeCounters),
+        fb.backoff(),
+        sleeper);
+  }
+
+  @Override
+  public boolean bytesOverLimit(long bytes) {
+    return bytes > options.getBatchMaxBytes();
+  }
+
+  private static MovingFunction createMovingFunction(Duration samplePeriod, 
Duration sampleUpdate) {
+    return new MovingFunction(
+        samplePeriod.getMillis(),
+        sampleUpdate.getMillis(),
+        1 /* numSignificantBuckets */,
+        1 /* numSignificantSamples */,
+        Sum.ofLongs()
+    );
+  }
+
+  interface CounterFactory extends Serializable {
+    CounterFactory DEFAULT = Metrics::counter;
+
+    Counter getCounter(String namespace, String name);
+  }
+
+  private enum AttemptState {
+    Active,
+    Active_Started,
+    Complete_Success,
+    Complete_Error;
+
+    public void checkActive() {
+      switch (this) {
+        case Active:
+        case Active_Started:
+          return;
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be Active, but 
was Complete_Error");
+      }
+    }
+
+    public void checkStarted() {
+      switch (this) {
+        case Active_Started:
+          return;
+        case Active:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Active");
+        case Complete_Success:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Success");
+        case Complete_Error:
+          throw new IllegalStateException("Expected state to be 
Active_Started, but was Complete_Error");
+      }
+    }
+  }
+
+  private abstract class BaseRpcAttempt implements RpcAttempt {
+    private final Logger logger;
+    protected final Counters counters;
+    protected final BackOff backoff;
+    protected final Sleeper sleeper;
+
+    protected AttemptState state;
+    protected Instant start;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseRpcAttempt(
+        Context context, Counters counters, BackOff backoff, Sleeper sleeper) {
+      this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos", 
context.getNamespace()));
+      this.counters = counters;
+      this.backoff = backoff;
+      this.sleeper = sleeper;
+      this.state = AttemptState.Active;
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
+      if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
+        logger.info("Delaying request by {}ms", 
shouldThrottleRequest.getMillis());
+        throttleRequest(shouldThrottleRequest);
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public void checkCanRetry(RuntimeException exception)
+        throws InterruptedException {
+      state.checkActive();
+
+      Optional<ApiException> findApiException = findApiException(exception);
+
+      if (findApiException.isPresent()) {
+        ApiException apiException = findApiException.get();
+        // order here is semi-important
+        // First we always want to test if the error code is one of the codes 
we have deemed
+        // non-retryable before delegating to the exceptions default set.
+        if (
+            maxAttemptsExhausted()
+                || 
getStatusCodeNumber(apiException).map(NON_RETRYABLE_ERROR_NUMBERS::contains).orElse(false)
+                || !apiException.isRetryable()
+        ) {
+          state = AttemptState.Complete_Error;
+          throw apiException;
+        }
+      } else {
+        state = AttemptState.Complete_Error;
+        throw exception;
+      }
+    }
+
+    @Override
+    public void recordStartRequest(Instant instantSinceEpoch) {
+      at.recordStartRequest(instantSinceEpoch);
+      start = instantSinceEpoch;
+      state = AttemptState.Active_Started;
+    }
+
+    @Override
+    public void completeSuccess() {
+      state.checkActive();
+      state = AttemptState.Complete_Success;
+    }
+
+    @Override
+    public boolean isCodeRetryable(Code code) {
+      return !NON_RETRYABLE_ERROR_NUMBERS.contains(code.getNumber());
+    }
+
+    private boolean maxAttemptsExhausted() throws InterruptedException {
+      try {
+        boolean exhausted = !BackOffUtils.next(sleeper, backoff);
+        if (exhausted) {
+          logger.error("Max attempts exhausted after {} attempts.", 
options.getMaxAttempts());
+        }
+        return exhausted;
+      } catch (IOException e) {
+        // We are using FluentBackoff which does not ever throw an IOException 
from its methods
+        // Catch and wrap any potential IOException as a RuntimeException 
since it won't ever
+        // happen unless the implementation of FluentBackoff changes.
+        throw new RuntimeException(e);
+      }
+    }
+
+    protected Logger getLogger() {
+      return logger;
+    }
+
+    protected final void throttleRequest(Duration shouldThrottleRequest) 
throws InterruptedException {
+      counters.throttlingMs.inc(shouldThrottleRequest.getMillis());
+      sleeper.sleep(shouldThrottleRequest.getMillis());
+    }
+
+    private Optional<Integer> getStatusCodeNumber(ApiException apiException) {
+      StatusCode statusCode = apiException.getStatusCode();
+      if (statusCode instanceof GrpcStatusCode) {
+        GrpcStatusCode grpcStatusCode = (GrpcStatusCode) statusCode;
+        return Optional.of(grpcStatusCode.getTransportCode().value());
+      }
+      return Optional.empty();
+    }
+
+    private Optional<ApiException> findApiException(Throwable throwable) {
+      if (throwable instanceof ApiException) {
+        ApiException apiException = (ApiException) throwable;
+        return Optional.of(apiException);
+      } else {
+        Throwable cause = throwable.getCause();
+        if (cause != null) {
+          return findApiException(cause);
+        } else {
+          return Optional.empty();
+        }
+      }
+    }
+  }
+
+  private final class RpcReadAttemptImpl extends BaseRpcAttempt implements 
RpcReadAttempt {
+    private RpcReadAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      at.recordSuccessfulRequest(start);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      at.recordFailedRequest(start);
+    }
+
+    @Override
+    public void recordStreamValue(Instant now) {
+      state.checkActive();
+      counters.rpcStreamValueReceived.inc();
+    }
+  }
+
+  final class RpcWriteAttemptImpl extends BaseRpcAttempt implements 
RpcWriteAttempt {
+
+    private RpcWriteAttemptImpl(Context context,
+        Counters counters, BackOff backoff, Sleeper sleeper) {
+      super(context, counters, backoff, sleeper);
+    }
+
+    @Override
+    public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
+      state.checkActive();
+      Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
+      if (shouldThrottle.isPresent()) {
+        Duration throttleDuration = shouldThrottle.get();
+        getLogger().debug("Still ramping up, Delaying request by {}ms", 
throttleDuration.getMillis());
+        throttleRequest(throttleDuration);
+        return false;
+      } else {
+        return super.awaitSafeToProceed(instant);
+      }
+    }
+
+    @Override
+    public <T, E extends Element<T>> FlushBufferImpl<T, E> 
newFlushBuffer(Instant instantSinceEpoch) {
+      state.checkActive();
+      int availableWriteCountBudget = 
writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
+      int nextBatchMaxCount = wb.nextBatchMaxCount(instantSinceEpoch);
+      int batchMaxCount = Ints.min(
+          Math.max(0, availableWriteCountBudget),
+          Math.max(0, nextBatchMaxCount),
+          options.getBatchMaxCount()
+      );
+      return new FlushBufferImpl<>(
+          batchMaxCount,
+          options.getBatchMaxBytes()
+      );
+    }
+
+    @Override
+    public void recordSuccessfulRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcSuccesses.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordSuccessfulRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+    @Override
+    public void recordFailedRequest(Instant end, int numWrites) {
+      state.checkStarted();
+      counters.rpcFailures.inc();
+      writeRampUp.recordWriteCount(start, numWrites);
+      at.recordFailedRequest(start);
+      wb.recordRequestLatency(start, end, numWrites);
+    }
+
+  }
+
+  /**
+   * Determines batch sizes for commit RPCs based on past performance.
+   *
+   * <p>It aims for a target response time per RPC: it uses the response times 
for previous RPCs and
+   * the number of entities contained in them, calculates a rolling average 
time-per-document, and
+   * chooses the number of entities for future writes to hit the target time.
+   *
+   * <p>This enables us to send large batches without sending over-large 
requests in the case of
+   * expensive document writes that may timeout before the server can apply 
them all.
+   */
+  private final class WriteBatcher {
+
+    private final MovingAverage meanLatencyPerDocumentMs;
+
+    private WriteBatcher() {
+      this.meanLatencyPerDocumentMs =
+          new MovingAverage(
+              options.getSamplePeriod(),
+              options.getSamplePeriodBucketSize()
+              /* numSignificantBuckets */

Review comment:
       I think this can be removed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to