[
https://issues.apache.org/jira/browse/BEAM-8376?focusedWorklogId=593023&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-593023
]
ASF GitHub Bot logged work on BEAM-8376:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/21 21:27
Start Date: 06/May/21 21:27
Worklog Time Spent: 10m
Work Description: BenWhitehead commented on a change in pull request
#14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r627775924
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.rpc.Code;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MovingFunction;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.Interval;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class RpcQosImpl implements RpcQos {
+
+ /** Non-retryable errors. See
https://cloud.google.com/apis/design/errors#handling_errors. */
+ private static final Set<Integer> NON_RETRYABLE_ERROR_NUMBERS =
+ ImmutableSet.of(
+ Code.ALREADY_EXISTS,
+ Code.DATA_LOSS,
+ Code.FAILED_PRECONDITION,
+ Code.INVALID_ARGUMENT,
+ Code.OUT_OF_RANGE,
+ Code.NOT_FOUND,
+ Code.PERMISSION_DENIED,
+ Code.UNIMPLEMENTED)
+ .stream()
+ .map(Code::getNumber)
+ .collect(ImmutableSet.toImmutableSet());
+ /**
+ * The target minimum number of requests per samplePeriodMs, even if no
requests succeed. Must be
+ * greater than 0, else we could throttle to zero. Because every decision is
probabilistic, there
+ * is no guarantee that the request rate in any given interval will not be
zero. (This is the +1
+ * from the formula in
https://landing.google.com/sre/book/chapters/handling-overload.html)
+ */
+ private static final double MIN_REQUESTS = 1;
+
+ private final RpcQosOptions options;
+
+ private final AdaptiveThrottler at;
+ private final WriteBatcher wb;
+ private final WriteRampUp writeRampUp;
+ private final FluentBackoff fb;
+
+ private final WeakHashMap<Context, O11y> counters;
+ private final Random random;
+ private final Sleeper sleeper;
+ private final Function<Context, O11y> computeCounters;
+ private final DistributionFactory distributionFactory;
+
+ RpcQosImpl(
+ RpcQosOptions options,
+ Random random,
+ Sleeper sleeper,
+ CounterFactory counterFactory,
+ DistributionFactory distributionFactory) {
+ this.options = options;
+ this.random = random;
+ this.sleeper = sleeper;
+ DistributionFactory filteringDistributionFactory =
+ new DiagnosticOnlyFilteringDistributionFactory(
+ !options.isShouldReportDiagnosticMetrics(), distributionFactory);
+ this.distributionFactory = filteringDistributionFactory;
+ at =
+ new AdaptiveThrottler(
+ options.getSamplePeriod(),
+ options.getSamplePeriodBucketSize(),
+ options.getThrottleDuration(),
+ options.getOverloadRatio());
+ wb =
+ new WriteBatcher(
+ options.getSamplePeriod(),
+ options.getSamplePeriodBucketSize(),
+ options.getBatchInitialCount(),
+ options.getBatchTargetLatency(),
+ filteringDistributionFactory);
+ writeRampUp =
+ new WriteRampUp(
+ Math.max(1, 500 / options.getHintMaxNumWorkers()),
filteringDistributionFactory);
+ // maxRetries is an inclusive value, we want exclusive since we are
tracking all attempts
+ fb =
+ FluentBackoff.DEFAULT
+ .withMaxRetries(options.getMaxAttempts() - 1)
+ .withInitialBackoff(options.getInitialBackoff());
+ counters = new WeakHashMap<>();
+ computeCounters = (Context c) -> O11y.create(c, counterFactory,
filteringDistributionFactory);
+ }
+
+ @Override
+ public RpcWriteAttemptImpl newWriteAttempt(Context context) {
+ return new RpcWriteAttemptImpl(
+ context, counters.computeIfAbsent(context, computeCounters),
fb.backoff(), sleeper);
+ }
+
+ @Override
+ public RpcReadAttemptImpl newReadAttempt(Context context) {
+ return new RpcReadAttemptImpl(
+ context, counters.computeIfAbsent(context, computeCounters),
fb.backoff(), sleeper);
+ }
+
+ @Override
+ public boolean bytesOverLimit(long bytes) {
+ return bytes > options.getBatchMaxBytes();
+ }
+
+ private static MovingFunction createMovingFunction(Duration samplePeriod,
Duration sampleUpdate) {
+ return new MovingFunction(
+ samplePeriod.getMillis(),
+ sampleUpdate.getMillis(),
+ 1 /* numSignificantBuckets */,
+ 1 /* numSignificantSamples */,
+ Sum.ofLongs());
+ }
+
+ private enum AttemptState {
+ PENDING,
+ STARTED,
+ COMPLETE_SUCCESS,
+ COMPLETE_ERROR;
+
+ public void checkActive() {
+ switch (this) {
+ case PENDING:
+ case STARTED:
+ return;
+ case COMPLETE_SUCCESS:
+ throw new IllegalStateException(
+ "Expected state to be PENDING or STARTED, but was
COMPLETE_SUCCESS");
+ case COMPLETE_ERROR:
+ throw new IllegalStateException(
+ "Expected state to be PENDING or STARTED, but was
COMPLETE_ERROR");
+ }
+ }
+
+ public void checkStarted() {
+ switch (this) {
+ case STARTED:
+ return;
+ case PENDING:
+ throw new IllegalStateException("Expected state to be STARTED, but
was PENDING");
+ case COMPLETE_SUCCESS:
+ throw new IllegalStateException("Expected state to be STARTED, but
was COMPLETE_SUCCESS");
+ case COMPLETE_ERROR:
+ throw new IllegalStateException("Expected state to be STARTED, but
was COMPLETE_ERROR");
+ }
+ }
+ }
+
+ private abstract class BaseRpcAttempt implements RpcAttempt {
+ private final Logger logger;
+ final O11y o11y;
+ final BackOff backoff;
+ final Sleeper sleeper;
+
+ AttemptState state;
+ Instant start;
+
+ @SuppressWarnings(
+ "initialization.fields.uninitialized") // allow transient fields to be
managed by component
+ // lifecycle
+ BaseRpcAttempt(Context context, O11y o11y, BackOff backoff, Sleeper
sleeper) {
+ this.logger = LoggerFactory.getLogger(String.format("%s.RpcQos",
context.getNamespace()));
+ this.o11y = o11y;
+ this.backoff = backoff;
+ this.sleeper = sleeper;
+ this.state = AttemptState.PENDING;
+ }
+
+ @Override
+ public boolean awaitSafeToProceed(Instant instant) throws
InterruptedException {
+ state.checkActive();
+ Duration shouldThrottleRequest = at.shouldThrottleRequest(instant);
+ if (shouldThrottleRequest.compareTo(Duration.ZERO) > 0) {
+ long throttleRequestMillis = shouldThrottleRequest.getMillis();
+ logger.debug("Delaying request by {}ms", throttleRequestMillis);
+ throttleRequest(shouldThrottleRequest);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void checkCanRetry(RuntimeException exception) throws
InterruptedException {
+ state.checkActive();
+
+ Optional<ApiException> findApiException = findApiException(exception);
+
+ if (findApiException.isPresent()) {
+ ApiException apiException = findApiException.get();
+ // order here is semi-important
+ // First we always want to test if the error code is one of the codes
we have deemed
+ // non-retryable before delegating to the exceptions default set.
+ if (maxAttemptsExhausted()
+ || getStatusCodeNumber(apiException)
+ .map(NON_RETRYABLE_ERROR_NUMBERS::contains)
+ .orElse(false)
+ || !apiException.isRetryable()) {
+ state = AttemptState.COMPLETE_ERROR;
+ throw apiException;
+ }
+ } else {
+ state = AttemptState.COMPLETE_ERROR;
+ throw exception;
+ }
+ }
+
+ @Override
+ public void completeSuccess() {
+ state.checkActive();
+ state = AttemptState.COMPLETE_SUCCESS;
+ }
+
+ @Override
+ public boolean isCodeRetryable(Code code) {
+ return !NON_RETRYABLE_ERROR_NUMBERS.contains(code.getNumber());
+ }
+
+ @Override
+ public void recordRequestSuccessful(Instant end) {
+ state.checkStarted();
+ o11y.rpcSuccesses.inc();
+ o11y.rpcDurationMs.update(durationMs(end));
+ at.recordSuccessfulRequest(start);
+ }
+
+ @Override
+ public void recordRequestFailed(Instant end) {
+ state.checkStarted();
+ o11y.rpcFailures.inc();
+ o11y.rpcDurationMs.update(durationMs(end));
+ at.recordFailedRequest(start);
+ }
+
+ private boolean maxAttemptsExhausted() throws InterruptedException {
+ try {
+ boolean exhausted = !BackOffUtils.next(sleeper, backoff);
+ if (exhausted) {
+ logger.error("Max attempts exhausted after {} attempts.",
options.getMaxAttempts());
+ }
+ return exhausted;
+ } catch (IOException e) {
+ // We are using FluentBackoff which does not ever throw an IOException
from its methods
+ // Catch and wrap any potential IOException as a RuntimeException
since it won't ever
+ // happen unless the implementation of FluentBackoff changes.
+ throw new RuntimeException(e);
+ }
+ }
+
+ Logger getLogger() {
+ return logger;
+ }
+
+ final void throttleRequest(Duration shouldThrottleRequest) throws
InterruptedException {
+ o11y.throttlingMs.inc(shouldThrottleRequest.getMillis());
+ sleeper.sleep(shouldThrottleRequest.getMillis());
+ }
+
+ final long durationMs(Instant end) {
+ return end.minus(start.getMillis()).getMillis();
+ }
+
+ private Optional<Integer> getStatusCodeNumber(ApiException apiException) {
+ StatusCode statusCode = apiException.getStatusCode();
+ if (statusCode instanceof GrpcStatusCode) {
+ GrpcStatusCode grpcStatusCode = (GrpcStatusCode) statusCode;
+ return Optional.of(grpcStatusCode.getTransportCode().value());
+ }
+ return Optional.empty();
+ }
+
+ private Optional<ApiException> findApiException(Throwable throwable) {
+ if (throwable instanceof ApiException) {
+ ApiException apiException = (ApiException) throwable;
+ return Optional.of(apiException);
+ } else {
+ Throwable cause = throwable.getCause();
+ if (cause != null) {
+ return findApiException(cause);
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+ }
+
+ private final class RpcReadAttemptImpl extends BaseRpcAttempt implements
RpcReadAttempt {
+ private RpcReadAttemptImpl(Context context, O11y o11y, BackOff backoff,
Sleeper sleeper) {
+ super(context, o11y, backoff, sleeper);
+ }
+
+ @Override
+ public void recordRequestStart(Instant start) {
+ at.recordStartRequest(start);
+ this.start = start;
+ state = AttemptState.STARTED;
+ }
+
+ @Override
+ public void recordStreamValue(Instant now) {
+ state.checkActive();
+ o11y.rpcStreamValueReceived.inc();
+ }
+ }
+
+ final class RpcWriteAttemptImpl extends BaseRpcAttempt implements
RpcWriteAttempt {
+
+ private RpcWriteAttemptImpl(Context context, O11y o11y, BackOff backoff,
Sleeper sleeper) {
+ super(context, o11y, backoff, sleeper);
+ }
+
+ @Override
+ public boolean awaitSafeToProceed(Instant instant) throws
InterruptedException {
+ state.checkActive();
+ Optional<Duration> shouldThrottle = writeRampUp.shouldThrottle(instant);
+ if (shouldThrottle.isPresent()) {
+ Duration throttleDuration = shouldThrottle.get();
+ long throttleDurationMillis = throttleDuration.getMillis();
+ getLogger().debug("Still ramping up, Delaying request by {}ms",
throttleDurationMillis);
+ throttleRequest(throttleDuration);
+ return false;
+ } else {
+ return super.awaitSafeToProceed(instant);
+ }
+ }
+
+ @Override
+ public <T, ElementT extends Element<T>> FlushBufferImpl<T, ElementT>
newFlushBuffer(
+ Instant instantSinceEpoch) {
+ state.checkActive();
+ int availableWriteCountBudget =
writeRampUp.getAvailableWriteCountBudget(instantSinceEpoch);
+ int nextBatchMaxCount = wb.nextBatchMaxCount(instantSinceEpoch);
+ int batchMaxCount =
+ Ints.min(
+ Math.max(0, availableWriteCountBudget),
+ Math.max(0, nextBatchMaxCount),
+ options.getBatchMaxCount());
+ o11y.batchCapacityCount.update(batchMaxCount);
+ return new FlushBufferImpl<>(batchMaxCount, options.getBatchMaxBytes());
+ }
+
+ @Override
+ public void recordRequestStart(Instant start, int numWrites) {
+ at.recordStartRequest(start, numWrites);
+ writeRampUp.recordWriteCount(start, numWrites);
+ this.start = start;
+ state = AttemptState.STARTED;
+ }
+
+ @Override
+ public void recordWriteCounts(Instant end, int successfulWrites, int
failedWrites) {
+ int totalWrites = successfulWrites + failedWrites;
+ state.checkStarted();
+ wb.recordRequestLatency(start, end, totalWrites,
o11y.latencyPerDocumentMs);
+ if (successfulWrites > 0) {
+ at.recordSuccessfulRequest(start, successfulWrites);
+ }
+ if (failedWrites > 0) {
+ at.recordFailedRequest(start, failedWrites);
+ }
+ }
+ }
+
+ /**
+ * Determines batch sizes based on past performance.
+ *
+ * <p>It aims for a target response time per RPC: it uses the response times
for previous RPCs and
+ * the number of documents contained in them, calculates a rolling average
time-per-document, and
+ * chooses the number of documents for future writes to hit the target time.
+ *
+ * <p>This enables us to send large batches without sending overly-large
requests in the case of
+ * expensive document writes that may timeout before the server can apply
them all.
+ */
+ private static final class WriteBatcher {
+ private static final Logger LOG =
LoggerFactory.getLogger(WriteBatcher.class);
+
+ private final int batchInitialCount;
+ private final Duration batchTargetLatency;
+ private final MovingAverage meanLatencyPerDocumentMs;
+ private final Distribution batchMaxCount;
+
+ private WriteBatcher(
+ Duration samplePeriod,
+ Duration samplePeriodBucketSize,
+ int batchInitialCount,
+ Duration batchTargetLatency,
+ DistributionFactory distributionFactory) {
+ this.batchInitialCount = batchInitialCount;
+ this.batchTargetLatency = batchTargetLatency;
+ this.meanLatencyPerDocumentMs = new MovingAverage(samplePeriod,
samplePeriodBucketSize);
+ this.batchMaxCount =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_writeBatcher_batchMaxCount");
+ }
+
+ private void recordRequestLatency(
+ Instant start, Instant end, int numWrites, Distribution distribution) {
+ try {
+ Interval interval = new Interval(start, end);
+ long msPerWrite = numWrites == 0 ? 0 : interval.toDurationMillis() /
numWrites;
+ distribution.update(msPerWrite);
+ meanLatencyPerDocumentMs.add(end, msPerWrite);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Invalid time interval start = {} end = {}", start, end, e);
+ }
+ }
+
+ private int nextBatchMaxCount(Instant instantSinceEpoch) {
+ if (!meanLatencyPerDocumentMs.hasValue(instantSinceEpoch)) {
+ return batchInitialCount;
+ }
+ long recentMeanLatency =
Math.max(meanLatencyPerDocumentMs.get(instantSinceEpoch), 1);
+ long nextBatchMaxCount = batchTargetLatency.getMillis() /
recentMeanLatency;
+ int count = Math.toIntExact(nextBatchMaxCount);
+ batchMaxCount.update(count);
+ return count;
+ }
+ }
+
+ /**
+ * An implementation of client-side adaptive throttling. See
+ *
https://sre.google/sre-book/handling-overload/#client-side-throttling-a7sYUg
for a full
+ * discussion of the use case and algorithm applied.
+ */
+ private final class AdaptiveThrottler {
+ private final MovingFunction successfulRequestsMovingFunction;
+ private final MovingFunction failedRequestsMovingFunction;
+ private final MovingFunction allRequestsMovingFunction;
+ private final Distribution allRequestsCountDist;
+ private final Distribution successfulRequestsCountDist;
+ private final Distribution overloadMaxCountDist;
+ private final Distribution overloadUsageDist;
+ private final Distribution throttleProbabilityDist;
+ private final Distribution throttlingMs;
+ private final LinearBackoff backoff;
+ private final double overloadRatio;
+
+ private AdaptiveThrottler(
+ Duration samplePeriod,
+ Duration samplePeriodBucketSize,
+ Duration throttleDuration,
+ double overloadRatio) {
+ allRequestsMovingFunction = createMovingFunction(samplePeriod,
samplePeriodBucketSize);
+ successfulRequestsMovingFunction = createMovingFunction(samplePeriod,
samplePeriodBucketSize);
+ failedRequestsMovingFunction = createMovingFunction(samplePeriod,
samplePeriodBucketSize);
+ allRequestsCountDist =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_adaptiveThrottler_allRequestsCount");
+ successfulRequestsCountDist =
+ distributionFactory.get(
+ RpcQos.class.getName(),
"qos_adaptiveThrottler_successfulRequestsCount");
+ overloadMaxCountDist =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_adaptiveThrottler_overloadMaxCount");
+ overloadUsageDist =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_adaptiveThrottler_overloadUsagePct");
+ throttleProbabilityDist =
+ distributionFactory.get(
+ RpcQos.class.getName(),
"qos_adaptiveThrottler_throttleProbabilityPct");
+ throttlingMs =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_adaptiveThrottler_throttlingMs");
+ backoff = new LinearBackoff(throttleDuration);
+ this.overloadRatio = overloadRatio;
+ }
+
+ private Duration shouldThrottleRequest(Instant instantSinceEpoch) {
+ double delayProbability = throttlingProbability(instantSinceEpoch);
+
+ if (random.nextDouble() < delayProbability) {
+ long millis = backoff.nextBackOffMillis();
+ throttlingMs.update(millis);
+ return Duration.millis(millis);
+ } else {
+ backoff.reset();
+ return Duration.ZERO;
+ }
+ }
+
+ private void recordStartRequest(Instant instantSinceEpoch) {
+ recordStartRequest(instantSinceEpoch, 1);
+ }
+
+ private void recordStartRequest(Instant instantSinceEpoch, int value) {
+ allRequestsMovingFunction.add(instantSinceEpoch.getMillis(), value);
+ }
+
+ private void recordSuccessfulRequest(Instant instantSinceEpoch) {
+ recordSuccessfulRequest(instantSinceEpoch, 1);
+ }
+
+ private void recordSuccessfulRequest(Instant instantSinceEpoch, int value)
{
+ successfulRequestsMovingFunction.add(instantSinceEpoch.getMillis(),
value);
+ }
+
+ private void recordFailedRequest(Instant instantSinceEpoch) {
+ recordFailedRequest(instantSinceEpoch, 1);
+ }
+
+ private void recordFailedRequest(Instant instantSinceEpoch, int value) {
+ failedRequestsMovingFunction.add(instantSinceEpoch.getMillis(), value);
+ }
+
+ /**
+ * Implementation of the formula from <a target="_blank" rel="noopener
noreferrer"
+ * href="https://sre.google/sre-book/handling-overload/#eq2101">Handling
Overload from SRE
+ * Book</a>.
+ */
+ private double throttlingProbability(Instant instantSinceEpoch) {
+ if (!allRequestsMovingFunction.isSignificant()) {
+ return 0;
+ }
+ long nowMsSinceEpoch = instantSinceEpoch.getMillis();
+ long allRequestsCount = allRequestsMovingFunction.get(nowMsSinceEpoch);
+ long successfulRequestsCount =
successfulRequestsMovingFunction.get(nowMsSinceEpoch);
+
+ double overloadMaxCount = overloadRatio * successfulRequestsCount;
+ double overloadUsage = allRequestsCount - overloadMaxCount;
+
+ double calcProbability = overloadUsage / (allRequestsCount +
MIN_REQUESTS);
+ allRequestsCountDist.update(allRequestsCount);
+ successfulRequestsCountDist.update(successfulRequestsCount);
+ overloadMaxCountDist.update((long) overloadMaxCount);
+ overloadUsageDist.update((long) (overloadUsage * 100));
+ throttleProbabilityDist.update((long) (calcProbability * 100));
+ return Math.max(0, calcProbability);
+ }
+ }
+
+ /**
+ * An implementation providing the 500/50/5 ramp up strategy recommended by
<a target="_blank"
+ * rel="noopener noreferrer"
+ *
href="https://cloud.google.com/firestore/docs/best-practices#ramping_up_traffic">Ramping
up
+ * traffic</a>.
+ */
+ @VisibleForTesting
+ static final class WriteRampUp {
+
+ private static final Duration RAMP_UP_INTERVAL =
Duration.standardMinutes(5);
+ private final int baseBatchBudget;
+ private final long rampUpIntervalMinutes;
+ private final MovingFunction writeCounts;
+ private final LinearBackoff backoff;
+ private final Distribution throttlingMs;
+ private final Distribution availableWriteCountBudget;
+
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private Optional<Instant> firstInstant = Optional.empty();
+
+ WriteRampUp(int baseBatchBudget, DistributionFactory distributionFactory) {
+ this.baseBatchBudget = baseBatchBudget;
+ this.rampUpIntervalMinutes = RAMP_UP_INTERVAL.getStandardMinutes();
+ this.writeCounts =
+ createMovingFunction(
+ // track up to one second of budget usage.
+ // this determines the full duration of time we want to keep
track of request counts
+ Duration.standardSeconds(1),
+ // refill the budget each second
+ // this determines the sub-granularity the full duration will
be broken into. So if
+ // we wanted budget to refill twice per second, this could be
passed
+ // Duration.millis(500)
+ Duration.standardSeconds(1));
+ this.backoff = new LinearBackoff(Duration.standardSeconds(1));
+ this.throttlingMs =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_rampUp_throttlingMs");
+ this.availableWriteCountBudget =
+ distributionFactory.get(RpcQos.class.getName(),
"qos_rampUp_availableWriteCountBudget");
+ }
+
+ int getAvailableWriteCountBudget(Instant instant) {
Review comment:
Correct, this only tracks data for up to one second. In the case of this
ramp up calculation we're not taking past performance into consideration
(instead the write batcher and adaptive throttler take care of that). This
class is only attempt to limit based on the 555 budget and what has been used
within that specific one-second window.
The budget value grows relative to the `firstInstant` captured, so even if a
worker is only sending a request every 5 seconds the budget calculation will
still be for the next time window. If the worker is sending more than one
request a second, then it will decrement from the budget and be throttled until
the next second if budget has been exhausted for the specific second.
For `hintMaxNumWorkers = 1` the budget will fill along this
[line](https://www.wolframalpha.com/input/?i=500+*+1.5%5Emax%280%2C+%28x-5%29%2F5%29+from+0+to+30).
I've got a test
`org.apache.beam.sdk.io.gcp.firestore.RpcQosSimulationTest#writeRampUp_shouldScaleAlongTheExpectedLine`
which runs a series of operations against the RampUp to ensure values fall on
the expected line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 593023)
Time Spent: 23h 40m (was: 23.5h)
> Add FirestoreIO connector to Java SDK
> -------------------------------------
>
> Key: BEAM-8376
> URL: https://issues.apache.org/jira/browse/BEAM-8376
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Stefan Djelekar
> Priority: P3
> Time Spent: 23h 40m
> Remaining Estimate: 0h
>
> Motivation:
> There is no Firestore connector for Java SDK at the moment.
> Having it will enhance the integrations with database options on the Google
> Cloud Platform.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)