danthev commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r627712849
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -204,7 +210,8 @@ public boolean awaitSafeToProceed(Instant instant) throws
InterruptedException {
state.checkActive();
Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
- logger.info("Delaying request by {}ms",
shouldThrottleRequest.getMillis());
+ long throttleRequestMillis = shouldThrottleRequest.getMillis();
+ logger.debug("Delaying request by {}ms", throttleRequestMillis);
Review comment:
This is when the adaptive throttler backs off, right? I think I'd tend
toward an `INFO` log here, and a `due to previous failures` in the message.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -94,106 +108,124 @@ void handleWriteFailures(ContextAdapter<WriteFailure>
context, Instant timestamp
/**
* {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
- * <p/>
- * Writes will be enqueued to be sent at a potentially
- * later time when more writes are available. This Fn attempts to maximize
throughput while
- * maintaining a high request success rate.
- * <p/>
- * All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ *
+ * <p>Writes will be enqueued to be sent at a potentially later time when
more writes are
+ * available. This Fn attempts to maximize throughput while maintaining a
high request success
+ * rate.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
* the lifecycle of this Fn.
*/
- static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write,
Out> implements
- HasRpcAttemptContext {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+ abstract static class BaseBatchWriteFn<OutT> extends WindowAwareDoFn<Write,
OutT>
+ implements HasRpcAttemptContext {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
private final JodaClock clock;
private final FirestoreStatefulComponentFactory
firestoreStatefulComponentFactory;
private final RpcQosOptions rpcQosOptions;
+ private final CounterFactory counterFactory;
+ private final V1FnRpcAttemptContext rpcAttemptContext;
// transient running state information, not important to any possible
checkpointing
- private transient FirestoreRpc firestoreRpc;
+ // worker scoped state
private transient RpcQos rpcQos;
- private transient String projectId;
+ private transient Counter writesSuccessful;
+ private transient Counter writesFailedRetryable;
+ private transient Counter writesFailedNonRetryable;
+ // bundle scoped state
+ private transient FirestoreStub firestoreStub;
+ private transient DatabaseRootName databaseRootName;
+
@VisibleForTesting
transient Queue<@NonNull WriteElement> writes = new
PriorityQueue<>(WriteElement.COMPARATOR);
- @VisibleForTesting
- transient int queueNextEntryPriority = 0;
- @SuppressWarnings("initialization.fields.uninitialized") // allow
transient fields to be managed by component lifecycle
+ @VisibleForTesting transient int queueNextEntryPriority = 0;
+
+ @SuppressWarnings(
+ "initialization.fields.uninitialized") // allow transient fields to be
managed by component
+ // lifecycle
BaseBatchWriteFn(
JodaClock clock,
FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
- RpcQosOptions rpcQosOptions
- ) {
+ RpcQosOptions rpcQosOptions,
+ CounterFactory counterFactory) {
this.clock = clock;
this.firestoreStatefulComponentFactory =
firestoreStatefulComponentFactory;
this.rpcQosOptions = rpcQosOptions;
- }
-
- Logger getLogger() {
- return LOGGER;
+ this.counterFactory = counterFactory;
+ this.rpcAttemptContext = V1FnRpcAttemptContext.BatchWrite;
}
@Override
public Context getRpcAttemptContext() {
- return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+ return rpcAttemptContext;
}
@Override
- public final void
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull
DisplayData.Builder builder) {
- builder
- .include("rpcQosOptions", rpcQosOptions);
+ public final void populateDisplayData(
+ @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {
Review comment:
Not checkerframework's `NonNull`?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -344,7 +354,8 @@ public boolean awaitSafeToProceed(Instant instant) throws
InterruptedException {
Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
if (shouldThrottle.isPresent()) {
Duration throttleDuration = shouldThrottle.get();
- getLogger().debug("Still ramping up, Delaying request by {}ms",
throttleDuration.getMillis());
+ long throttleDurationMillis = throttleDuration.getMillis();
+ getLogger().debug("Still ramping up, Delaying request by {}ms",
throttleDurationMillis);
Review comment:
Same here with `INFO` logs. If there's a better way that's fine too,
what's important is that it's not too hard for the user to find out if they're
being throttled by ramp-up or the adaptive throttler.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -83,48 +81,60 @@
private final WriteRampUp writeRampUp;
private final FluentBackoff fb;
- private final WeakHashMap<Context, Counters> counters;
+ private final WeakHashMap<Context, O11y> counters;
private final Random random;
private final Sleeper sleeper;
- private final Function<Context, Counters> computeCounters;
+ private final Function<Context, O11y> computeCounters;
+ private final DistributionFactory distributionFactory;
RpcQosImpl(
RpcQosOptions options,
Random random,
Sleeper sleeper,
- CounterFactory counterFactory
- ) {
+ CounterFactory counterFactory,
+ DistributionFactory distributionFactory) {
this.options = options;
this.random = random;
this.sleeper = sleeper;
- at = new AdaptiveThrottler();
- wb = new WriteBatcher();
- writeRampUp = new WriteRampUp(
- Math.max(1, 500 / options.getHintMaxNumWorkers())
- );
- fb = FluentBackoff.DEFAULT
- .withMaxRetries(options.getMaxAttempts() - 1) // maxRetries is an
inclusive value, we want exclusive since we are tracking all attempts
- .withInitialBackoff(options.getInitialBackoff());
+ DistributionFactory filteringDistributionFactory =
+ new DiagnosticOnlyFilteringDistributionFactory(
+ !options.isShouldReportDiagnosticMetrics(), distributionFactory);
+ this.distributionFactory = filteringDistributionFactory;
+ at =
+ new AdaptiveThrottler(
+ options.getSamplePeriod(),
+ options.getSamplePeriodBucketSize(),
+ options.getThrottleDuration(),
+ options.getOverloadRatio());
+ wb =
+ new WriteBatcher(
+ options.getSamplePeriod(),
+ options.getSamplePeriodBucketSize(),
+ options.getBatchInitialCount(),
+ options.getBatchTargetLatency(),
+ filteringDistributionFactory);
+ writeRampUp =
+ new WriteRampUp(
+ Math.max(1, 500 / options.getHintMaxNumWorkers()),
filteringDistributionFactory);
Review comment:
You can leave this if you want, but if it's not too much effort you
could make the base budget a float and move `Math.max(1, ...)` into the
per-second budget calculation. Then ramp-up isn't automatically faster for
`hintMaxNumWorkers > 500`, but instead stays at the minimum level a little
longer before actually ramping up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]