prodriguezdefino commented on code in PR #32529:
URL: https://github.com/apache/beam/pull/32529#discussion_r1776009919


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java:
##########
@@ -344,14 +358,39 @@ public void process(ProcessContext c) {}
 
     private static class RowDynamicDestinations extends 
DynamicDestinations<Row, String> {
       Schema schema;
+      String fixedDestination = null;
+      List<String> primaryKey = null;
 
       RowDynamicDestinations(Schema schema) {
         this.schema = schema;
       }
 
+      RowDynamicDestinations withFixedDestination(String destination) {
+        this.fixedDestination = destination;
+        return this;
+      }
+
+      RowDynamicDestinations withPrimaryKey(List<String> primaryKey) {
+        this.primaryKey = primaryKey;
+        return this;
+      }
+
       @Override
       public String getDestination(ValueInSingleWindow<Row> element) {
-        return element.getValue().getString("destination");
+        return fixedDestination != null
+            ? fixedDestination
+            : element.getValue().getString("destination");
+      }
+
+      @Override
+      public TableConstraints getTableConstraints(String destination) {
+        return Optional.ofNullable(this.primaryKey)
+            .filter(pk -> !pk.isEmpty())
+            .map(
+                pk ->
+                    new TableConstraints()
+                        .setPrimaryKey(new 
TableConstraints.PrimaryKey().setColumns(pk)))
+            .orElse(null);

Review Comment:
   the PTrasform instantiated by the provider does not execute this code. 
That’s the reason why I modified the row dynamic destination. I thought also on 
using a delegating dynamic destination but sadly that type has package limited 
access so I had only three options: introduce a larger change in the provider 
to directly use the BQIO apply, change visibility for the delegating dynamic 
destination, use inheritance for the row dyndest or modifying it. I thought the 
implemented option is the less disruptive from framework perspective. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to