reuvenlax commented on code in PR #17550:
URL: https://github.com/apache/beam/pull/17550#discussion_r876229787
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -455,21 +506,26 @@ public void process(
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
flushAll();
- for (DestinationState state : destinations.values()) {
- if (!useDefaultStream) {
+ if (!useDefaultStream) {
Review Comment:
Why this change? Won't this prevent us from unpinning the client?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions
pipelineOptions) {
}
}
+ @Setup
+ public void setup() {
+ if (useDefaultStream) {
+ destinations = Maps.newHashMap();
+ }
+ }
+
@StartBundle
public void startBundle() throws IOException {
- destinations = Maps.newHashMap();
+ if (!useDefaultStream) {
Review Comment:
Why add this if?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -455,21 +506,26 @@ public void process(
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
flushAll();
- for (DestinationState state : destinations.values()) {
- if (!useDefaultStream) {
+ if (!useDefaultStream) {
+ for (DestinationState state : destinations.values()) {
context.output(
KV.of(state.tableUrn, state.streamName),
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
GlobalWindow.INSTANCE);
+ state.teardown();
}
- state.teardown();
+ destinations.clear();
+ destinations = null;
}
- destinations.clear();
- destinations = null;
}
@Teardown
public void teardown() {
+ if (destinations != null) {
+ for (DestinationState state : destinations.values()) {
+ state.teardown();
+ }
Review Comment:
We shouldn't be doing this in teardown
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java:
##########
@@ -93,12 +93,22 @@
void setUseStorageWriteApiAtLeastOnce(Boolean value);
@Description(
- "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams.")
+ "If set, then BigQueryIO.Write will default to using this number of
Storage Write API streams. ")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();
void setNumStorageWriteApiStreams(Integer value);
+ @Description(
+ "When using the \"_default\" table stream, this option sets the number
of stream append clients that will be allocated "
Review Comment:
reference at-least once writes. Users don't know about default streams, as
that's an implementation detail
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions
pipelineOptions) {
}
}
+ @Setup
+ public void setup() {
Review Comment:
I'm not sure what this is adding?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]