prodriguezdefino commented on code in PR #17417:
URL: https://github.com/apache/beam/pull/17417#discussion_r863969447
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java:
##########
@@ -206,6 +206,10 @@ interface StreamAppendClient extends AutoCloseable {
/** Append rows to a Storage API write stream at the given offset. */
ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
throws Exception;
+ default long getInflightWaitSeconds() {
Review Comment:
nit: pretty self explanatory but, maybe adding a quick method doc would be
good.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -275,7 +321,13 @@ void flush(RetryManager<AppendRowsResponse,
Context<AppendRowsResponse>> retryMa
offset = this.currentOffset;
this.currentOffset += inserts.getSerializedRowsCount();
}
- return writeStream.appendRows(offset, protoRows);
+ ApiFuture<AppendRowsResponse> response =
writeStream.appendRows(offset, protoRows);
+
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
+ if (writeStream.getInflightWaitSeconds() > 5) {
+ LOG.warn(
+ "Storage Api write delay more than " +
writeStream.getInflightWaitSeconds());
+ }
+ return response;
Review Comment:
would it make sense to add a Distribution here to measure the exec time of
this lambda?
we are locking all the finished bundles on getStreamAppendClient call, maybe
we can corroborate/discard any locking delays on high volume there.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -19,12 +19,14 @@
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import java.io.IOException;
+import java.time.Instant;
Review Comment:
any reasons not to use joda time here? totally fine with it, but just
wondering
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -123,4 +123,10 @@
Integer getSchemaUpdateRetries();
void setSchemaUpdateRetries(Integer value);
+
+ @Description("Maximum (best effort) size of a single append to the storage
API.")
+ @Default.Integer(2 * 1024 * 1024)
+ Integer getStorageApiAppendThresholdBytes();
+
+ void setStorageApiAppendThresholdBytes(Integer value);
Review Comment:
maybe we can add also a knob for the record's count threshold, but that can
be added later as well
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -76,11 +79,17 @@
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor =
Executors.newCachedThreadPool();
+ // The Guava cache object is threadsafe. However our protocol requires that
client pin the
+ // StreamAppendClient
+ // after looking up the cache, and we must ensure that the cache is not
accessed in between the
+ // lookup and the pin
+ // (any access of the cache could trigger element expiration). Therefore
most uses of the
Review Comment:
this comment looks incomplete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]