This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch release-2.35.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.35.0 by this push:
     new 343e140  [BEAM-13352] Cherrypick #16146 onto 2.35.0 release branch. 
(#16166)
343e140 is described below

commit 343e1404ac09f7f659043e3aa73d67acb03370af
Author: tvalentyn <[email protected]>
AuthorDate: Wed Dec 8 07:36:30 2021 -0800

    [BEAM-13352] Cherrypick #16146 onto 2.35.0 release branch. (#16166)
    
    Co-authored-by: Reuven Lax <[email protected]>
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  5 +--
 .../io/gcp/bigquery/StorageApiConvertMessages.java | 39 ++++++++++++++++++++--
 .../bigquery/StorageApiDynamicDestinations.java    |  5 +--
 .../StorageApiDynamicDestinationsBeamRow.java      |  4 ++-
 .../StorageApiDynamicDestinationsTableRow.java     |  6 ++--
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  6 ++--
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  8 +++--
 .../bigquery/StorageApiWritesShardedRecords.java   |  2 +-
 .../bigquery/TwoLevelMessageConverterCache.java    |  6 ++--
 9 files changed, 58 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 1702e32..2e47741 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2908,10 +2908,7 @@ public class BigQueryIO {
           // Fallback behavior: convert to JSON TableRows and convert those 
into Beam TableRows.
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsTableRow<>(
-                  dynamicDestinations,
-                  tableRowWriterFactory.getToRowFn(),
-                  getBigQueryServices().getDatasetService(bqOptions),
-                  getCreateDisposition());
+                  dynamicDestinations, tableRowWriterFactory.getToRowFn(), 
getCreateDisposition());
         }
 
         StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
index d58c7af..ebae60e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -31,10 +35,13 @@ public class StorageApiConvertMessages<DestinationT, 
ElementT>
     extends PTransform<
         PCollection<KV<DestinationT, ElementT>>, PCollection<KV<DestinationT, 
byte[]>>> {
   private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private final BigQueryServices bqServices;
 
   public StorageApiConvertMessages(
-      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations) {
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+      BigQueryServices bqServices) {
     this.dynamicDestinations = dynamicDestinations;
+    this.bqServices = bqServices;
   }
 
   @Override
@@ -44,7 +51,7 @@ public class StorageApiConvertMessages<DestinationT, ElementT>
 
     return input.apply(
         "Convert to message",
-        ParDo.of(new ConvertMessagesDoFn<>(dynamicDestinations, operationName))
+        ParDo.of(new ConvertMessagesDoFn<>(dynamicDestinations, bqServices, 
operationName))
             .withSideInputs(dynamicDestinations.getSideInputs()));
   }
 
@@ -52,23 +59,49 @@ public class StorageApiConvertMessages<DestinationT, 
ElementT>
       extends DoFn<KV<DestinationT, ElementT>, KV<DestinationT, byte[]>> {
     private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
     private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+    private final BigQueryServices bqServices;
+    private transient @Nullable DatasetService datasetServiceInternal = null;
 
     ConvertMessagesDoFn(
         StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+        BigQueryServices bqServices,
         String operationName) {
       this.dynamicDestinations = dynamicDestinations;
       this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
+      this.bqServices = bqServices;
+    }
+
+    private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
+      if (datasetServiceInternal == null) {
+        datasetServiceInternal =
+            
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+      }
+      return datasetServiceInternal;
+    }
+
+    @Teardown
+    public void onTeardown() {
+      try {
+        if (datasetServiceInternal != null) {
+          datasetServiceInternal.close();
+          datasetServiceInternal = null;
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @ProcessElement
     public void processElement(
         ProcessContext c,
+        PipelineOptions pipelineOptions,
         @Element KV<DestinationT, ElementT> element,
         OutputReceiver<KV<DestinationT, byte[]>> o)
         throws Exception {
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
       MessageConverter<ElementT> messageConverter =
-          messageConverters.get(element.getKey(), dynamicDestinations);
+          messageConverters.get(
+              element.getKey(), dynamicDestinations, 
getDatasetService(pipelineOptions));
       o.output(
           KV.of(element.getKey(), 
messageConverter.toMessage(element.getValue()).toByteArray()));
     }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index 65e14c6..1a63aff 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -22,6 +22,7 @@ import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -42,8 +43,8 @@ abstract class StorageApiDynamicDestinations<T, DestinationT>
     this.inner = inner;
   }
 
-  public abstract MessageConverter<T> getMessageConverter(DestinationT 
destination)
-      throws Exception;
+  public abstract MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws 
Exception;
 
   @Override
   public DestinationT getDestination(ValueInSingleWindow<T> element) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 40814a0..531ce8e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import java.time.Duration;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
@@ -45,7 +46,8 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT>
   }
 
   @Override
-  public MessageConverter<T> getMessageConverter(DestinationT destination) 
throws Exception {
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws 
Exception {
     return new MessageConverter<T>() {
       Descriptor descriptor =
           destinationDescriptorCache.get(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index de4817b..e208556 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -35,7 +35,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuild
 public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
     extends StorageApiDynamicDestinations<T, DestinationT> {
   private final SerializableFunction<T, TableRow> formatFunction;
-  private final DatasetService datasetService;
   private final CreateDisposition createDisposition;
 
   // TODO: Is this cache needed? All callers of getMessageConverter are 
already caching the resullt.
@@ -45,16 +44,15 @@ public class StorageApiDynamicDestinationsTableRow<T, 
DestinationT>
   StorageApiDynamicDestinationsTableRow(
       DynamicDestinations<T, DestinationT> inner,
       SerializableFunction<T, TableRow> formatFunction,
-      DatasetService datasetService,
       CreateDisposition createDisposition) {
     super(inner);
     this.formatFunction = formatFunction;
-    this.datasetService = datasetService;
     this.createDisposition = createDisposition;
   }
 
   @Override
-  public MessageConverter<T> getMessageConverter(DestinationT destination) 
throws Exception {
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws 
Exception {
     return new MessageConverter<T>() {
       Descriptor descriptor =
           destinationDescriptorCache.get(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index 2d079e3..de5e888 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -89,7 +89,7 @@ public class StorageApiLoads<DestinationT, ElementT>
         input.apply("rewindowIntoGlobal", Window.into(new GlobalWindows()));
     PCollection<KV<DestinationT, byte[]>> convertedRecords =
         inputInGlobalWindow
-            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations))
+            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
             .setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
     convertedRecords.apply(
         "StorageApiWriteInconsistent",
@@ -108,7 +108,7 @@ public class StorageApiLoads<DestinationT, ElementT>
     // TODO(reuvenlax): Add autosharding support so that users don't have to 
pick a shard count.
     PCollection<KV<ShardedKey<DestinationT>, byte[]>> shardedRecords =
         inputInGlobalWindow
-            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations))
+            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
             .apply(
                 "AddShard",
                 ParDo.of(
@@ -154,7 +154,7 @@ public class StorageApiLoads<DestinationT, ElementT>
             "rewindowIntoGlobal", Window.<KV<DestinationT, ElementT>>into(new 
GlobalWindows()));
     PCollection<KV<DestinationT, byte[]>> convertedRecords =
         inputInGlobalWindow
-            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations))
+            .apply("Convert", new 
StorageApiConvertMessages<>(dynamicDestinations, bqServices))
             .setCoder(KvCoder.of(destinationCoder, ByteArrayCoder.of()));
     convertedRecords.apply(
         "StorageApiWriteUnsharded",
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 2217a51..21c7a46 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -341,7 +341,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       numPendingRecordBytes = 0;
     }
 
-    DestinationState createDestinationState(ProcessContext c, DestinationT 
destination) {
+    DestinationState createDestinationState(
+        ProcessContext c, DestinationT destination, DatasetService 
datasetService) {
       TableDestination tableDestination1 = 
dynamicDestinations.getTable(destination);
       checkArgument(
           tableDestination1 != null,
@@ -362,7 +363,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
 
       MessageConverter<ElementT> messageConverter;
       try {
-        messageConverter = messageConverters.get(destination, 
dynamicDestinations);
+        messageConverter = messageConverters.get(destination, 
dynamicDestinations, datasetService);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -379,7 +380,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
       initializeDatasetService(pipelineOptions);
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
       DestinationState state =
-          destinations.computeIfAbsent(element.getKey(), k -> 
createDestinationState(c, k));
+          destinations.computeIfAbsent(
+              element.getKey(), k -> createDestinationState(c, k, 
datasetService));
       flushIfNecessary();
       state.addMessage(element.getValue());
       ++numPendingRecords;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index aacab97..dc55476 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -294,7 +294,7 @@ public class StorageApiWritesShardedRecords<DestinationT, 
ElementT>
       final String tableId = tableDestination.getTableUrn();
       final DatasetService datasetService = getDatasetService(pipelineOptions);
       MessageConverter<ElementT> messageConverter =
-          messageConverters.get(element.getKey().getKey(), 
dynamicDestinations);
+          messageConverters.get(element.getKey().getKey(), 
dynamicDestinations, datasetService);
       Descriptor descriptor = messageConverter.getSchemaDescriptor();
 
       // Each ProtoRows object contains at most 1MB of rows.
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
index 3e928ff..57431d0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TwoLevelMessageConverterCache.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.io.Serializable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
@@ -54,7 +55,8 @@ class TwoLevelMessageConverterCache<DestinationT, ElementT> 
implements Serializa
 
   public MessageConverter<ElementT> get(
       DestinationT destination,
-      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations)
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+      DatasetService datasetService)
       throws Exception {
     // Lookup first in the local cache, and fall back to the static cache if 
necessary.
     return localMessageConverters.get(
@@ -63,6 +65,6 @@ class TwoLevelMessageConverterCache<DestinationT, ElementT> 
implements Serializa
             (MessageConverter<ElementT>)
                 CACHED_MESSAGE_CONVERTERS.get(
                     KV.of(operationName, destination),
-                    () -> 
dynamicDestinations.getMessageConverter(destination)));
+                    () -> dynamicDestinations.getMessageConverter(destination, 
datasetService)));
   }
 }

Reply via email to