ns-shua commented on issue #32508:
URL: https://github.com/apache/beam/issues/32508#issuecomment-2365436302

   @liferoad  The latest entire code is a little bit complicated, I can give 
you a simplified version for the bigquery write part
   ```
   PCollection<KV<TenantMetadata, Row>> data = .... // here the TenantMetadata 
has table destination information
   BigQueryIO.<KV<TopicMetadata, Row>>write()
              .withFormatFunction(
                       elem -> BigQueryUtils.toTableRow(row(elem.getValue()))
              .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).
              .withoutValidation()
           
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
           .optimizedWrites()
           .withClustering()
           .withAutoSharding()
          .withFailedInsertRetryPolicy(retryTransientErrors()
          .to(tenantProjectDestinations) // refer to the 
TenantProjectDestination function, because all the tables have identical schema 
so I choose the to(TableFunction) method
   );
   
   public class TenantProjectDestinations<T> implements
       SerializableFunction<ValueInSingleWindow<KV<TopicMetadata, T>>, 
TableDestination> {
   
       private static final Map<TopicMetadata, TableDestination> 
destinationCache = Maps.newHashMap();
   
       private final ValueProvider<String> tableId;
   
       public static <TableRowType> TenantProjectDestinations<TableRowType> of(
               ValueProvider<String> tableId) {
           return new TenantProjectDestinations<>(tableId);
       }
   
       private TenantProjectDestinations(
               ValueProvider<String> tableId) {
           this.tableId = tableId;
       }
   
       @Override
       public TableDestination apply(ValueInSingleWindow<KV<TopicMetadata, T>> 
input) {
         assert input != null;
         TopicMetadata k = Objects.requireNonNull(input.getValue()).getKey();
           destinationCache.computeIfAbsent(k, topicMetadata -> new 
TableDestination(
               new TableReference()
                   .setProjectId(k.getProjectId())
                   .setDatasetId(k.getDatasetId())
                   .setTableId(tableId.get()),
               "TenantTable"));
           return destinationCache.get(k);
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to