danthev commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r627712849



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -204,7 +210,8 @@ public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
       state.checkActive();
       Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
       if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
-        logger.info("Delaying request by {}ms", 
shouldThrottleRequest.getMillis());
+        long throttleRequestMillis = shouldThrottleRequest.getMillis();
+        logger.debug("Delaying request by {}ms", throttleRequestMillis);

Review comment:
       This is when the adaptive throttler backs off, right? I think I'd tend 
toward an `INFO` log here, and a `due to previous failures` in the message.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -94,106 +108,124 @@ void handleWriteFailures(ContextAdapter<WriteFailure> 
context, Instant timestamp
 
   /**
    * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
-   * <p/>
-   * Writes will be enqueued to be sent at a potentially
-   * later time when more writes are available. This Fn attempts to maximize 
throughput while
-   * maintaining a high request success rate.
-   * <p/>
-   * All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   *
+   * <p>Writes will be enqueued to be sent at a potentially later time when 
more writes are
+   * available. This Fn attempts to maximize throughput while maintaining a 
high request success
+   * rate.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
    * the lifecycle of this Fn.
    */
-  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, 
Out> implements
-      HasRpcAttemptContext {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+  abstract static class BaseBatchWriteFn<OutT> extends WindowAwareDoFn<Write, 
OutT>
+      implements HasRpcAttemptContext {
+    private static final Logger LOG =
+        
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
     private final JodaClock clock;
     private final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
     private final RpcQosOptions rpcQosOptions;
+    private final CounterFactory counterFactory;
+    private final V1FnRpcAttemptContext rpcAttemptContext;
 
     // transient running state information, not important to any possible 
checkpointing
-    private transient FirestoreRpc firestoreRpc;
+    //  worker scoped state
     private transient RpcQos rpcQos;
-    private transient String projectId;
+    private transient Counter writesSuccessful;
+    private transient Counter writesFailedRetryable;
+    private transient Counter writesFailedNonRetryable;
+    //  bundle scoped state
+    private transient FirestoreStub firestoreStub;
+    private transient DatabaseRootName databaseRootName;
+
     @VisibleForTesting
     transient Queue<@NonNull WriteElement> writes = new 
PriorityQueue<>(WriteElement.COMPARATOR);
-    @VisibleForTesting
-    transient int queueNextEntryPriority = 0;
 
-    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    @VisibleForTesting transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings(
+        "initialization.fields.uninitialized") // allow transient fields to be 
managed by component
+    // lifecycle
     BaseBatchWriteFn(
         JodaClock clock,
         FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
-        RpcQosOptions rpcQosOptions
-    ) {
+        RpcQosOptions rpcQosOptions,
+        CounterFactory counterFactory) {
       this.clock = clock;
       this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
       this.rpcQosOptions = rpcQosOptions;
-    }
-
-    Logger getLogger() {
-      return LOGGER;
+      this.counterFactory = counterFactory;
+      this.rpcAttemptContext = V1FnRpcAttemptContext.BatchWrite;
     }
 
     @Override
     public Context getRpcAttemptContext() {
-      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+      return rpcAttemptContext;
     }
 
     @Override
-    public final void 
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull 
DisplayData.Builder builder) {
-      builder
-          .include("rpcQosOptions", rpcQosOptions);
+    public final void populateDisplayData(
+        @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {

Review comment:
       Not checkerframework's `NonNull`?

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -344,7 +354,8 @@ public boolean awaitSafeToProceed(Instant instant) throws 
InterruptedException {
       Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
       if (shouldThrottle.isPresent()) {
         Duration throttleDuration = shouldThrottle.get();
-        getLogger().debug("Still ramping up, Delaying request by {}ms", 
throttleDuration.getMillis());
+        long throttleDurationMillis = throttleDuration.getMillis();
+        getLogger().debug("Still ramping up, Delaying request by {}ms", 
throttleDurationMillis);

Review comment:
       Same here with `INFO` logs. If there's a better way that's fine too, 
what's important is that it's not too hard for the user to find out if they're 
being throttled by ramp-up or the adaptive throttler.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -83,48 +81,60 @@
   private final WriteRampUp writeRampUp;
   private final FluentBackoff fb;
 
-  private final WeakHashMap<Context, Counters> counters;
+  private final WeakHashMap<Context, O11y> counters;
   private final Random random;
   private final Sleeper sleeper;
-  private final Function<Context, Counters> computeCounters;
+  private final Function<Context, O11y> computeCounters;
+  private final DistributionFactory distributionFactory;
 
   RpcQosImpl(
       RpcQosOptions options,
       Random random,
       Sleeper sleeper,
-      CounterFactory counterFactory
-  ) {
+      CounterFactory counterFactory,
+      DistributionFactory distributionFactory) {
     this.options = options;
     this.random = random;
     this.sleeper = sleeper;
-    at = new AdaptiveThrottler();
-    wb = new WriteBatcher();
-    writeRampUp = new WriteRampUp(
-        Math.max(1, 500 / options.getHintMaxNumWorkers())
-    );
-    fb = FluentBackoff.DEFAULT
-        .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an 
inclusive value, we want exclusive since we are tracking all attempts
-        .withInitialBackoff(options.getInitialBackoff());
+    DistributionFactory filteringDistributionFactory =
+        new DiagnosticOnlyFilteringDistributionFactory(
+            !options.isShouldReportDiagnosticMetrics(), distributionFactory);
+    this.distributionFactory = filteringDistributionFactory;
+    at =
+        new AdaptiveThrottler(
+            options.getSamplePeriod(),
+            options.getSamplePeriodBucketSize(),
+            options.getThrottleDuration(),
+            options.getOverloadRatio());
+    wb =
+        new WriteBatcher(
+            options.getSamplePeriod(),
+            options.getSamplePeriodBucketSize(),
+            options.getBatchInitialCount(),
+            options.getBatchTargetLatency(),
+            filteringDistributionFactory);
+    writeRampUp =
+        new WriteRampUp(
+            Math.max(1, 500 / options.getHintMaxNumWorkers()), 
filteringDistributionFactory);

Review comment:
       You can leave this if you want, but if it's not too much effort you 
could make the base budget a float and move `Math.max(1, ...)` into the 
per-second budget calculation. Then ramp-up isn't automatically faster for 
`hintMaxNumWorkers > 500`, but instead stays at the minimum level a little 
longer before actually ramping up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to