lukas-mi commented on PR #32153:
URL: https://github.com/apache/beam/pull/32153#issuecomment-2379188460

   I've updated on the previous example:
   - previously had `setBigQueryProject` twice without setting 
`setBigQueryEndpoint`
   - added dataset creation
   - added the use of `withCustomGcsTempLocation`, otherwise and exception is 
thrown
   
   ```
         PipelineOptionsFactory.register(BigQueryOptions.class);
         BigQueryOptions options = 
PipelineOptionsFactory.create().as(BigQueryOptions.class);
   
         options.setBigQueryEndpoint(bqEmulator.getEmulatorHttpEndpoint());
         options.setBigQueryProject(bqEmulator.getProjectId());
   
         var pipeline = Pipeline.create(options);
   
         var sinkTable = "%s.%s.%s".formatted(bqEmulator.getProjectId(), 
"test_dataset", "test_table");
   
         var datasetId = DatasetId.of(bqEmulator.getProjectId(), 
"test_dataset");
         var datasetInfo = DatasetInfo.newBuilder(datasetId).build();
         bqClient.create(datasetInfo);
   
         var schema = new TableSchema()
                 .setFields(List.of(new TableFieldSchema()
                         .setName("testColumn")
                         .setType("STRING")
                         .setMode("REQUIRED")));
   
         var row = new TableRow();
         row.set("testColumn", "testValue");
         var rows = Create.of(List.of(row));
   
         pipeline
                 .apply(rows)
                 .apply(BigQueryIO
                         .<TableRow>write()
                         .to(sinkTable)
                         .withFormatFunction(new SerializableFunction<TableRow, 
TableRow>() {
                             @Override
                             public TableRow apply(TableRow input) {
                                 return input;
                             }
                         })
                         .withSchema(schema)
                         .withCustomGcsTempLocation(new ValueProvider<String>() 
{
                             @Override
                             public String get() {
                                 return "/tmp";
                             }
   
                             @Override
                             public @UnknownKeyFor @NonNull @Initialized 
boolean isAccessible() {
                                 return true;
                             }
                         })
                 );
   
         pipeline.run().waitUntilFinish();
   ```
   Now I'm getting an exception related to serialization:
   ```
   unable to serialize 
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891,
 mainOutputTag=Tag<output>, sideInputMapping={}, 
schemaInformation=DoFnSchemaInformation{elementConverters=[], 
fieldAccessDescriptor=*}}
   java.lang.IllegalArgumentException: unable to serialize 
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891,
 mainOutputTag=Tag<output>, sideInputMapping={}, 
schemaInformation=DoFnSchemaInformation{elementConverters=[], 
fieldAccessDescriptor=*}}
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to