Abacn commented on code in PR #31924:
URL: https://github.com/apache/beam/pull/31924#discussion_r1690044463
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -1318,24 +1361,61 @@ public void startBundle(StartBundleContext c) throws
IOException {
badRecords = new HashSet<>();
}
+ private final Counter throttlingMsecs =
+ Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME);
+
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws
Exception {
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
-
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record,
window));
+ Instant writeStart = Instant.now();
+ pendingThrottlingMsecs = 0;
+ bigtableWriter
+ .writeRecord(record)
+ .whenComplete(handleMutationException(record, window, writeStart));
+ if (pendingThrottlingMsecs > 0) {
+ throttlingMsecs.inc(pendingThrottlingMsecs);
+ }
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0)
+ 1);
}
private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
- KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
+ KV<ByteString, Iterable<Mutation>> record, BoundedWindow window,
Instant writeStart) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
+ // Exception due to resource unavailable or rate limited,
+ // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
+ boolean isResourceException = false;
+ if (exception instanceof StatusRuntimeException) {
+ StatusRuntimeException se = (StatusRuntimeException) exception;
+ if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
+ || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus()))
{
+ isResourceException = true;
+ }
+ } else if (exception instanceof DeadlineExceededException
+ || exception instanceof ResourceExhaustedException) {
+ isResourceException = true;
+ }
+ if (isResourceException) {
+ pendingThrottlingMsecs = new Duration(writeStart,
Instant.now()).getMillis();
+ }
failures.add(new BigtableWriteException(record, exception));
}
+ } else {
+ // add the excessive amount to throttling metrics if elapsed time >
target latency
+ if (writeOptions.getThrottlingReportTargetMs() != null
+ && writeOptions.getThrottlingReportTargetMs() > 0) {
Review Comment:
I was hesitating whether to set a default or only enable when Bigtable
client side rate limiting enabled (currently the latter, like here)
https://github.com/apache/beam/blob/83bbe21390ba4c78722cb45b27565fe412127894/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L1123
The challenge is that we do not have information on how much time a
completed batch write request was spent on throttling. Depending on the size of
cell it can be several seconds or more if not throttled.
Probably set a higher initial value (like 3 minutes) first
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]