This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch release-2.35.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.35.0 by this push:
new 343e140 [BEAM-13352] Cherrypick #16146 onto 2.35.0 release branch.
(#16166)
343e140 is described below
commit 343e1404ac09f7f659043e3aa73d67acb03370af
Author: tvalentyn <[email protected]>
AuthorDate: Wed Dec 8 07:36:30 2021 -0800
[BEAM-13352] Cherrypick #16146 onto 2.35.0 release branch. (#16166)
Co-authored-by: Reuven Lax <[email protected]>
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 5 +--
.../io/gcp/bigquery/StorageApiConvertMessages.java | 39 ++++++++++++++++++++--
.../bigquery/StorageApiDynamicDestinations.java | 5 +--
.../StorageApiDynamicDestinationsBeamRow.java | 4 ++-
.../StorageApiDynamicDestinationsTableRow.java | 6 ++--
.../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 6 ++--
.../bigquery/StorageApiWriteUnshardedRecords.java | 8 +++--
.../bigquery/StorageApiWritesShardedRecords.java | 2 +-
.../bigquery/TwoLevelMessageConverterCache.java | 6 ++--
9 files changed, 58 insertions(+), 23 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1702e32..2e47741 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2908,10 +2908,7 @@ public class BigQueryIO {
// Fallback behavior: convert to JSON TableRows and convert those
into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
- dynamicDestinations,
- tableRowWriterFactory.getToRowFn(),
- getBigQueryServices().getDatasetService(bqOptions),
- getCreateDisposition());
+ dynamicDestinations, tableRowWriterFactory.getToRowFn(),
getCreateDisposition());
}
StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
index d58c7af..ebae60e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
@@ -17,7 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -31,10 +35,13 @@ public class StorageApiConvertMessages<DestinationT,
ElementT>
extends PTransform<
PCollection<KV<DestinationT, ElementT>>, PCollection<KV<DestinationT,
byte[]>>> {
private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
+ private final BigQueryServices bqServices;
public StorageApiConvertMessages(
- StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations) {
+ StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations,
+ BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
+ this.bqServices = bqServices;
}
@Override
@@ -44,7 +51,7 @@ public class StorageApiConvertMessages<DestinationT, ElementT>
return input.apply(
"Convert to message",
- ParDo.of(new ConvertMessagesDoFn<>(dynamicDestinations, operationName))
+ ParDo.of(new ConvertMessagesDoFn<>(dynamicDestinations, bqServices,
operationName))
.withSideInputs(dynamicDestinations.getSideInputs()));
}
@@ -52,23 +59,49 @@ public class StorageApiConvertMessages<DestinationT,
ElementT>
extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, byte[]>> {
private final StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations;
private TwoLevelMessageConverterCache<DestinationT, ElementT>
messageConverters;
+ private final BigQueryServices bqServices;
+ private transient @Nullable DatasetService datasetServiceInternal = null;
ConvertMessagesDoFn(
StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations,
+ BigQueryServices bqServices,
String operationName) {
this.dynamicDestinations = dynamicDestinations;
this.messageConverters = new
TwoLevelMessageConverterCache<>(operationName);
+ this.bqServices = bqServices;
+ }
+
+ private DatasetService getDatasetService(PipelineOptions pipelineOptions)
throws IOException {
+ if (datasetServiceInternal == null) {
+ datasetServiceInternal =
+
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+ }
+ return datasetServiceInternal;
+ }
+
+ @Teardown
+ public void onTeardown() {
+ try {
+ if (datasetServiceInternal != null) {
+ datasetServiceInternal.close();
+ datasetServiceInternal = null;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@ProcessElement
public void processElement(
ProcessContext c,
+ PipelineOptions pipelineOptions,
@Element KV<DestinationT, ElementT> element,
OutputReceiver<KV<DestinationT, byte[]>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
MessageConverter<ElementT> messageConverter =
- messageConverters.get(element.getKey(), dynamicDestinations);
+ messageConverters.get(
+ element.getKey(), dynamicDestinations,
getDatasetService(pipelineOptions));
o.output(
KV.of(element.getKey(),
messageConverter.toMessage(element.getValue()).toByteArray()));
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index 65e14c6..1a63aff 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -22,6 +22,7 @@ import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -42,8 +43,8 @@ abstract class StorageApiDynamicDestinations<T, DestinationT>
this.inner = inner;
}
- public abstract MessageConverter<T> getMessageConverter(DestinationT
destination)
- throws Exception;
+ public abstract MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception;
@Override
public DestinationT getDestination(ValueInSingleWindow<T> element) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 40814a0..531ce8e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import java.time.Duration;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
@@ -45,7 +46,8 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT>
}
@Override
- public MessageConverter<T> getMessageConverter(DestinationT destination)
throws Exception {
+ public MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception {
return new MessageConverter<T>() {
Descriptor descriptor =
destinationDescriptorCache.get(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index de4817b..e208556 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -35,7 +35,6 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuild
public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
- private final DatasetService datasetService;
private final CreateDisposition createDisposition;
// TODO: Is this cache needed? All callers of getMessageConverter are
already caching the resullt.
@@ -45,16 +44,15 @@ public class StorageApiDynamicDestinationsTableRow<T,
DestinationT>
StorageApiDynamicDestinationsTableRow(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<T, TableRow> formatFunction,
- DatasetService datasetService,
CreateDisposition createDisposition) {
super(inner);
this.formatFunction = formatFunction;
- this.datasetService = datasetService;
this.createDisposition = createDisposition;
}
@Override
- public MessageConverter<T> getMessageConverter(DestinationT destination)
throws Exception {
+ public MessageConverter<T> getMessageConverter(
+ DestinationT destination, DatasetService datasetService) throws
Exception {
return new MessageConverter<T>() {
Descriptor descriptor =
destinationDescriptorCache.get(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index 2d079e3..de5e888 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -89,7 +89,7 @@ public class StorageApiLoads<DestinationT, ElementT>
input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()));
PCollection<KV<DestinationT, byte[]>> convertedRecords =
inputInGlobalWindow
- .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations))
+ .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
.setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
convertedRecords.apply(
"StorageApiWriteInconsistent",
@@ -108,7 +108,7 @@ public class StorageApiLoads<DestinationT, ElementT>
// TODO(reuvenlax): Add autosharding support so that users don't have to
pick a shard count.
PCollection<KV<ShardedKey<DestinationT>, byte[]>> shardedRecords =
inputInGlobalWindow
- .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations))
+ .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
.apply(
"AddShard",
ParDo.of(
@@ -154,7 +154,7 @@ public class StorageApiLoads<DestinationT, ElementT>
"rewindowIntoGlobal", Window.<KV<DestinationT, ElementT>>into(new
GlobalWindows()));
PCollection<KV<DestinationT, byte[]>> convertedRecords =
inputInGlobalWindow
- .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations))
+ .apply("Convert", new
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
.setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
convertedRecords.apply(
"StorageApiWriteUnsharded",
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 2217a51..21c7a46 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -341,7 +341,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
numPendingRecordBytes = 0;
}
- DestinationState createDestinationState(ProcessContext c, DestinationT
destination) {
+ DestinationState createDestinationState(
+ ProcessContext c, DestinationT destination, DatasetService
datasetService) {
TableDestination tableDestination1 =
dynamicDestinations.getTable(destination);
checkArgument(
tableDestination1 != null,
@@ -362,7 +363,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
MessageConverter<ElementT> messageConverter;
try {
- messageConverter = messageConverters.get(destination,
dynamicDestinations);
+ messageConverter = messageConverters.get(destination,
dynamicDestinations, datasetService);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -379,7 +380,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT,
ElementT>
initializeDatasetService(pipelineOptions);
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
DestinationState state =
- destinations.computeIfAbsent(element.getKey(), k ->
createDestinationState(c, k));
+ destinations.computeIfAbsent(
+ element.getKey(), k -> createDestinationState(c, k,
datasetService));
flushIfNecessary();
state.addMessage(element.getValue());
++numPendingRecords;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index aacab97..dc55476 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -294,7 +294,7 @@ public class StorageApiWritesShardedRecords<DestinationT,
ElementT>
final String tableId = tableDestination.getTableUrn();
final DatasetService datasetService = getDatasetService(pipelineOptions);
MessageConverter<ElementT> messageConverter =
- messageConverters.get(element.getKey().getKey(),
dynamicDestinations);
+ messageConverters.get(element.getKey().getKey(),
dynamicDestinations, datasetService);
Descriptor descriptor = messageConverter.getSchemaDescriptor();
// Each ProtoRows object contains at most 1MB of rows.
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
index 3e928ff..57431d0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import java.io.Serializable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -54,7 +55,8 @@ class TwoLevelMessageConverterCache<DestinationT, ElementT>
implements Serializa
public MessageConverter<ElementT> get(
DestinationT destination,
- StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations)
+ StorageApiDynamicDestinations<ElementT, DestinationT>
dynamicDestinations,
+ DatasetService datasetService)
throws Exception {
// Lookup first in the local cache, and fall back to the static cache if
necessary.
return localMessageConverters.get(
@@ -63,6 +65,6 @@ class TwoLevelMessageConverterCache<DestinationT, ElementT>
implements Serializa
(MessageConverter<ElementT>)
CACHED_MESSAGE_CONVERTERS.get(
KV.of(operationName, destination),
- () ->
dynamicDestinations.getMessageConverter(destination)));
+ () -> dynamicDestinations.getMessageConverter(destination,
datasetService)));
}
}