ns-shua commented on issue #32508:
URL: https://github.com/apache/beam/issues/32508#issuecomment-2365436302
@liferoad The latest entire code is a little bit complicated, I can give
you a simplified version for the bigquery write part
```
PCollection<KV<TenantMetadata, Row>> data = .... // here the TenantMetadata
has table destination information
BigQueryIO.<KV<TopicMetadata, Row>>write()
.withFormatFunction(
elem -> BigQueryUtils.toTableRow(row(elem.getValue()))
.withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.optimizedWrites()
.withClustering()
.withAutoSharding()
.withFailedInsertRetryPolicy(retryTransientErrors()
.to(tenantProjectDestinations) // refer to the
TenantProjectDestination function, because all the tables have identical schema
so I choose the to(TableFunction) method
);
public class TenantProjectDestinations<T> implements
SerializableFunction<ValueInSingleWindow<KV<TopicMetadata, T>>,
TableDestination> {
private static final Map<TopicMetadata, TableDestination>
destinationCache = Maps.newHashMap();
private final ValueProvider<String> tableId;
public static <TableRowType> TenantProjectDestinations<TableRowType> of(
ValueProvider<String> tableId) {
return new TenantProjectDestinations<>(tableId);
}
private TenantProjectDestinations(
ValueProvider<String> tableId) {
this.tableId = tableId;
}
@Override
public TableDestination apply(ValueInSingleWindow<KV<TopicMetadata, T>>
input) {
assert input != null;
TopicMetadata k = Objects.requireNonNull(input.getValue()).getKey();
destinationCache.computeIfAbsent(k, topicMetadata -> new
TableDestination(
new TableReference()
.setProjectId(k.getProjectId())
.setDatasetId(k.getDatasetId())
.setTableId(tableId.get()),
"TenantTable"));
return destinationCache.get(k);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]