reuvenlax commented on code in PR #38264:
URL: https://github.com/apache/beam/pull/38264#discussion_r3155988295
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java:
##########
@@ -134,7 +143,18 @@ public AppendClientInfo withAppendClient(
public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
- getCloseAppendClient().accept(client);
+ try {
+ getCloseAppendClient().accept(client);
+ } catch (Exception e) {
+ // We ignore errors when closing clients.
+ String msg =
+ e
+ + "\n"
+ + Arrays.stream(e.getStackTrace())
+ .map(StackTraceElement::toString)
+ .collect(Collectors.joining("\n"));
+ LOG.warn("Caught exception whilw trying to close append client.
Ignoring {}", msg);
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -671,475 +880,223 @@ public void process(
AppendClientInfo.of(
Preconditions.checkStateNotNull(tableSchema),
descriptor,
- // Make sure that the client is always closed in a
different thread
- // to
- // avoid blocking.
- client ->
- runAsyncIgnoreFailure(
- closeWriterExecutor,
- () -> {
- // Remove the pin that is "owned" by the
cache.
- client.unpin();
- client.close();
- }))
+ AutoCloseable::close)
.withAppendClient(
writeStreamService,
getOrCreateStream,
false,
defaultMissingValueInterpretation);
- // This pin is "owned" by the cache.
-
Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
return info;
};
- AtomicReference<AppendClientInfo> appendClientInfo =
- new AtomicReference<>(
- APPEND_CLIENTS.get(
- messageConverters.getAppendClientKey(element.getKey()),
getAppendClientInfo));
- String currentStream = getOrCreateStream.get();
- if (!currentStream.equals(appendClientInfo.get().getStreamName())) {
- // Cached append client is inconsistent with persisted state. Throw
away cached item and
- // force it to be
- // recreated.
-
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
- appendClientInfo.set(
- APPEND_CLIENTS.get(
- messageConverters.getAppendClientKey(element.getKey()),
getAppendClientInfo));
- }
-
- TableSchema updatedSchemaValue = updatedSchema.read();
- if (autoUpdateSchema && updatedSchemaValue != null) {
- if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) {
- appendClientInfo.set(
- AppendClientInfo.of(
- updatedSchemaValue,
appendClientInfo.get().getCloseAppendClient(), false));
-
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
- APPEND_CLIENTS.put(
- messageConverters.getAppendClientKey(element.getKey()),
appendClientInfo.get());
+ // The StreamWriter has two pins on it. The static cache holds a pin, as
it continues to cache
+ // values after this
+ // method exits, so must hold the pin. The local AppendClientHolder also
holds a pin, as the
+ // cache could in
+ // theory evict the object during execution and we want a pin held
throughout the execution of
+ // this function.
+ try (AppendClientHolder appendClientHolder =
+ new AppendClientHolder(element.getKey(), getAppendClientInfo)) {
+ String currentStream = getOrCreateStream.get();
+ if (!currentStream.equals(appendClientHolder.get().getStreamName())) {
+ // Cached append client is inconsistent with persisted state. Throw
away cached item and
+ // force it to be recreated.
+ appendClientHolder.invalidateAndReset();
}
- }
- // Initialize stream names and offsets for all contexts. This will be
called initially, but
- // will also be called if we roll over to a new stream on a retry.
- BiConsumer<Iterable<AppendRowsContext<DestinationT>>, Boolean>
initializeContexts =
- (contexts, isFailure) -> {
- try {
- if (isFailure) {
- // Clear the stream name, forcing a new one to be created.
- streamName.write("");
- }
- appendClientInfo.set(
- appendClientInfo
- .get()
- .withAppendClient(
- writeStreamService,
- getOrCreateStream,
- false,
- defaultMissingValueInterpretation));
- StreamAppendClient streamAppendClient =
- Preconditions.checkArgumentNotNull(
- appendClientInfo.get().getStreamAppendClient());
- String streamNameRead =
Preconditions.checkArgumentNotNull(streamName.read());
- long currentOffset =
Preconditions.checkArgumentNotNull(streamOffset.read());
- for (AppendRowsContext<DestinationT> context : contexts) {
- context.streamName = streamNameRead;
- streamAppendClient.pin();
- context.client =
appendClientInfo.get().getStreamAppendClient();
- context.offset = currentOffset;
- ++context.tryIteration;
- currentOffset = context.offset +
context.protoRows.getSerializedRowsCount();
- }
- streamOffset.write(currentOffset);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
-
- Consumer<Iterable<AppendRowsContext<DestinationT>>> clearClients =
- (contexts) -> {
-
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
- appendClientInfo.set(appendClientInfo.get().withNoAppendClient());
- APPEND_CLIENTS.put(
- messageConverters.getAppendClientKey(element.getKey()),
appendClientInfo.get());
- for (AppendRowsContext<DestinationT> context : contexts) {
- if (context.client != null) {
- // Unpin in a different thread, as it may execute a blocking
close.
- runAsyncIgnoreFailure(closeWriterExecutor,
context.client::unpin);
- context.client = null;
- }
- }
- };
-
- Function<AppendRowsContext<DestinationT>, ApiFuture<AppendRowsResponse>>
runOperation =
- context -> {
- if (context.protoRows.getSerializedRowsCount() == 0) {
- // This might happen if all rows in a batch failed and were sent
to the failed-rows
- // PCollection.
- return
ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
- }
- try {
- appendClientInfo.set(
- appendClientInfo
- .get()
- .withAppendClient(
- writeStreamService,
- getOrCreateStream,
- false,
- defaultMissingValueInterpretation));
- return
Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient())
- .appendRows(context.offset, context.protoRows);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
+ TableSchema updatedSchemaValue = autoUpdateSchema ?
updatedSchema.read() : null;
+ if (autoUpdateSchema && updatedSchemaValue != null) {
+ if (appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) {
+ appendClientHolder.invalidateAndReset();
+ }
+ }
Review Comment:
done
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -487,6 +446,46 @@ public void onTeardown() {
}
}
+ // Holder for an AppendClientHolder. Maintains a pin on the client as long
as it's active.
+ private class AppendClientHolder implements AutoCloseable {
Review Comment:
Correct. Keeping one pin for the duration of processElements.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]