This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 52c3f07aa56 Merge pull request #17417: [BEAM-14388] Address some
performance problems with the storage API
52c3f07aa56 is described below
commit 52c3f07aa56de738db3130283716d38206de42b7
Author: Reuven Lax <[email protected]>
AuthorDate: Wed May 4 13:14:07 2022 -0700
Merge pull request #17417: [BEAM-14388] Address some performance problems
with the storage API
---
.../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 ++
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 8 +++
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 15 ++++
.../StorageApiWriteRecordsInconsistent.java | 7 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 84 ++++++++++++++++++----
5 files changed, 106 insertions(+), 14 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 85437f267d0..a9beb5cbd7c 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -123,4 +123,10 @@ public interface BigQueryOptions
Integer getSchemaUpdateRetries();
void setSchemaUpdateRetries(Integer value);
+
+ @Description("Maximum (best effort) size of a single append to the storage
API.")
+ @Default.Integer(2 * 1024 * 1024)
+ Integer getStorageApiAppendThresholdBytes();
+
+ void setStorageApiAppendThresholdBytes(Integer value);
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 8a7eff378c6..23fce8ba6ff 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -206,6 +206,14 @@ public interface BigQueryServices extends Serializable {
/** Append rows to a Storage API write stream at the given offset. */
ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
throws Exception;
+ /**
+ * If the previous call to appendRows blocked due to flow control, returns
how long the call
+ * blocked for.
+ */
+ default long getInflightWaitSeconds() {
+ return 0;
+ }
+
/**
* Pin this object. If close() is called before all pins are removed, the
underlying resources
* will not be freed until all pins are removed.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 85a53487f88..2949150c9ee 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -33,6 +33,7 @@ import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Tables;
@@ -1223,9 +1224,18 @@ class BigQueryServicesImpl implements BigQueryServices {
ProtoSchema protoSchema =
ProtoSchema.newBuilder().setProtoDescriptor(descriptor.toProto()).build();
+ TransportChannelProvider transportChannelProvider =
+ BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
+ .setKeepAliveTime(org.threeten.bp.Duration.ofMinutes(1))
+ .setKeepAliveTimeout(org.threeten.bp.Duration.ofMinutes(1))
+ .setKeepAliveWithoutCalls(true)
+ .setChannelsPerCpu(2)
+ .build();
+
StreamWriter streamWriter =
StreamWriter.newBuilder(streamName)
.setWriterSchema(protoSchema)
+ .setChannelProvider(transportChannelProvider)
.setTraceId(
"Dataflow:"
+ (bqIOMetadata.getBeamJobId() != null
@@ -1275,6 +1285,11 @@ class BigQueryServicesImpl implements BigQueryServices {
throws Exception {
return streamWriter.append(rows, offset);
}
+
+ @Override
+ public long getInflightWaitSeconds() {
+ return streamWriter.getInflightWaitSeconds();
+ }
};
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
index b88f4d4b920..e433925e5b3 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java
@@ -47,12 +47,17 @@ public class
StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
@Override
public PCollection<Void> expand(PCollection<KV<DestinationT,
StorageApiWritePayload>> input) {
String operationName = input.getName() + "/" + getName();
+ BigQueryOptions bigQueryOptions =
input.getPipeline().getOptions().as(BigQueryOptions.class);
// Append records to the Storage API streams.
input.apply(
"Write Records",
ParDo.of(
new StorageApiWriteUnshardedRecords.WriteRecordsDoFn<>(
- operationName, dynamicDestinations, bqServices, true))
+ operationName,
+ dynamicDestinations,
+ bqServices,
+ true,
+ bigQueryOptions.getStorageApiAppendThresholdBytes()))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 1751799dd51..f033f423466 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -19,12 +19,14 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import java.io.IOException;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -40,6 +42,7 @@ import
org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -76,11 +79,19 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor =
Executors.newCachedThreadPool();
+ // The Guava cache object is threadsafe. However our protocol requires that
client pin the
+ // StreamAppendClient
+ // after looking up the cache, and we must ensure that the cache is not
accessed in between the
+ // lookup and the pin
+ // (any access of the cache could trigger element expiration). Therefore
most used of
+ // APPEND_CLIENTS should
+ // synchronize.
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
- .expireAfterAccess(5, TimeUnit.MINUTES)
+ .expireAfterAccess(15, TimeUnit.MINUTES)
.removalListener(
(RemovalNotification<String, StreamAppendClient> removal) -> {
+ LOG.info("Expiring append client for " + removal.getKey());
@Nullable final StreamAppendClient streamAppendClient =
removal.getValue();
// Close the writer in a different thread so as not to block
the main one.
runAsyncIgnoreFailure(closeWriterExecutor,
streamAppendClient::close);
@@ -117,10 +128,17 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
@Override
public PCollection<Void> expand(PCollection<KV<DestinationT,
StorageApiWritePayload>> input) {
String operationName = input.getName() + "/" + getName();
+ BigQueryOptions options =
input.getPipeline().getOptions().as(BigQueryOptions.class);
return input
.apply(
"Write Records",
- ParDo.of(new WriteRecordsDoFn<>(operationName,
dynamicDestinations, bqServices, false))
+ ParDo.of(
+ new WriteRecordsDoFn<>(
+ operationName,
+ dynamicDestinations,
+ bqServices,
+ false,
+ options.getStorageApiAppendThresholdBytes()))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes,
the append operations
@@ -132,6 +150,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
static class WriteRecordsDoFn<DestinationT, ElementT>
extends DoFn<KV<DestinationT, StorageApiWritePayload>, KV<String,
String>> {
+ private final Counter forcedFlushes =
Metrics.counter(WriteRecordsDoFn.class, "forcedFlushes");
+
class DestinationState {
private final String tableUrn;
private final MessageConverter<ElementT> messageConverter;
@@ -144,8 +164,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
+ private final Counter schemaMismatches =
+ Metrics.counter(WriteRecordsDoFn.class, "schemaMismatches");
+ private final Distribution inflightWaitSecondsDistribution =
+ Metrics.distribution(WriteRecordsDoFn.class,
"streamWriterWaitSeconds");
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
+ private Instant nextCacheTickle;
public DestinationState(
String tableUrn,
@@ -161,8 +186,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
}
void teardown() {
+ maybeTickleCache();
if (streamAppendClient != null) {
runAsyncIgnoreFailure(closeWriterExecutor,
streamAppendClient::unpin);
+ streamAppendClient = null;
}
}
@@ -206,6 +233,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
this.streamAppendClient.pin();
}
this.currentOffset = 0;
+ nextCacheTickle =
Instant.now().plus(java.time.Duration.ofMinutes(1));
}
return streamAppendClient;
} catch (Exception e) {
@@ -213,21 +241,43 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
}
+ void maybeTickleCache() {
+ if (streamAppendClient != null &&
Instant.now().isAfter(nextCacheTickle)) {
+ synchronized (APPEND_CLIENTS) {
+ APPEND_CLIENTS.getIfPresent(streamName);
+ }
+ nextCacheTickle =
Instant.now().plus(java.time.Duration.ofMinutes(1));
+ }
+ }
+
void invalidateWriteStream() {
if (streamAppendClient != null) {
synchronized (APPEND_CLIENTS) {
// Unpin in a different thread, as it may execute a blocking close.
runAsyncIgnoreFailure(closeWriterExecutor,
streamAppendClient::unpin);
+ // The default stream is cached across multiple different DoFns.
If they all try and
+ // invalidate, then we can
+ // get races between threads invalidating and recreating streams.
For this reason, we
+ // check to see that the
+ // cache still contains the object we created before invalidating
(in case another
+ // thread has already invalidated
+ // and recreated the stream).
+ @Nullable
+ StreamAppendClient cachedAppendClient =
APPEND_CLIENTS.getIfPresent(streamName);
+ if (cachedAppendClient != null
+ && System.identityHashCode(cachedAppendClient)
+ == System.identityHashCode(streamAppendClient)) {
+ APPEND_CLIENTS.invalidate(streamName);
+ }
}
streamAppendClient = null;
- APPEND_CLIENTS.invalidate(streamName);
- } else if (useDefaultStream) {
- APPEND_CLIENTS.invalidate(getDefaultStreamName());
}
}
void addMessage(StorageApiWritePayload payload) throws Exception {
+ maybeTickleCache();
if (payload.getSchemaHash() != descriptorWrapper.hash) {
+ schemaMismatches.inc();
// The descriptor on the payload doesn't match the descriptor we
know about. This
// means that the table has been updated, but that this transform
hasn't found out
// about that yet. Refresh the schema and force a new
StreamAppendClient to be
@@ -259,9 +309,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
return;
}
final ProtoRows.Builder inserts = ProtoRows.newBuilder();
- for (ByteString m : pendingMessages) {
- inserts.addSerializedRows(m);
- }
+ inserts.addAllSerializedRows(pendingMessages);
ProtoRows protoRows = inserts.build();
pendingMessages.clear();
@@ -275,7 +323,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
offset = this.currentOffset;
this.currentOffset += inserts.getSerializedRowsCount();
}
- return writeStream.appendRows(offset, protoRows);
+ ApiFuture<AppendRowsResponse> response =
writeStream.appendRows(offset, protoRows);
+
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
+ if (writeStream.getInflightWaitSeconds() > 5) {
+ LOG.warn(
+ "Storage Api write delay more than " +
writeStream.getInflightWaitSeconds());
+ }
+ return response;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -294,6 +348,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
recordsAppended.inc(protoRows.getSerializedRowsCount());
},
new Context<>());
+ maybeTickleCache();
}
}
@@ -302,8 +357,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
- private static final int FLUSH_THRESHOLD_RECORDS = 100;
- private static final int FLUSH_THRESHOLD_RECORD_BYTES = 2 * 1024 * 1024;
+ private static final int FLUSH_THRESHOLD_RECORDS = 150000;
+ private final int flushThresholdBytes;
private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
@@ -312,20 +367,23 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations,
BigQueryServices bqServices,
- boolean useDefaultStream) {
+ boolean useDefaultStream,
+ int flushThresholdBytes) {
this.messageConverters = new
TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
+ this.flushThresholdBytes = flushThresholdBytes;
}
boolean shouldFlush() {
return numPendingRecords > FLUSH_THRESHOLD_RECORDS
- || numPendingRecordBytes > FLUSH_THRESHOLD_RECORD_BYTES;
+ || numPendingRecordBytes > flushThresholdBytes;
}
void flushIfNecessary() throws Exception {
if (shouldFlush()) {
+ forcedFlushes.inc();
// Too much memory being used. Flush the state and wait for it to
drain out.
// TODO(reuvenlax): Consider waiting for memory usage to drop instead
of waiting for all the
// appends to finish.