Abacn commented on issue #33531:
URL: https://github.com/apache/beam/issues/33531#issuecomment-3367314239

   For future reference, here is a example for workaround
   
   ```java
   
   public class WriteTest {
     public static class DataType implements Serializable {
       public Integer key;
       public Instant timestamp;
       public DataType(Integer key) {
         this.key = key;
         this.timestamp = Instant.now();
       }
     }
   
     static SerializableFunction<DataType, TableRow> getTableRowFnc() {
       return (DataType data) -> {
         TableRow tableRow = new TableRow();
         tableRow.setF(ImmutableList.of(new TableCell().setV(data.key), new 
TableCell().setV(data.timestamp.getMillis() * 1000)));
         return tableRow;
       };
     }
   
     static SerializableFunction<DataType, String> getTagFnc() {
       return (DataType data) -> data.key.toString();
     }
   
     static TableSchema getTableSchema() {
       List<TableFieldSchema> fields = new ArrayList<>();
       fields.add(new 
TableFieldSchema().setName("f").setType("INT64").setMode("REQUIRED"));
       fields.add(new 
TableFieldSchema().setName("timestamp").setType("TIMESTAMP").setMode("NULLABLE"));
       return new TableSchema().setFields(fields);
     }
   
     public static void main(String argv[]) {
       PipelineOptions options = 
PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
       Pipeline p = Pipeline.create(options);
   
       p.apply(Create.of(new DataType(1), new DataType(2), new DataType(3)))
           .apply(BigQueryIO.<DataType>write()
               .to("<project>:<dataset>.<table>")
               
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
               .withSchema(getTableSchema())
               
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
               .withExtendedErrorInfo()
               .withFormatFunction(getTableRowFnc())
               .withDeterministicRecordIdFn(getTagFnc())
               .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
           );
       p.run().waitUntilFinish();
     }
   }
   ```
   
   By providing a TableRow that set fields by "setF" directly, it makes the 
code path of the fix #16872 evaluated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to