lukas-mi commented on PR #32153:
URL: https://github.com/apache/beam/pull/32153#issuecomment-2379188460
I've updated on the previous example:
- previously had `setBigQueryProject` twice without setting
`setBigQueryEndpoint`
- added dataset creation
- added the use of `withCustomGcsTempLocation`, otherwise and exception is
thrown
```
PipelineOptionsFactory.register(BigQueryOptions.class);
BigQueryOptions options =
PipelineOptionsFactory.create().as(BigQueryOptions.class);
options.setBigQueryEndpoint(bqEmulator.getEmulatorHttpEndpoint());
options.setBigQueryProject(bqEmulator.getProjectId());
var pipeline = Pipeline.create(options);
var sinkTable = "%s.%s.%s".formatted(bqEmulator.getProjectId(),
"test_dataset", "test_table");
var datasetId = DatasetId.of(bqEmulator.getProjectId(),
"test_dataset");
var datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bqClient.create(datasetInfo);
var schema = new TableSchema()
.setFields(List.of(new TableFieldSchema()
.setName("testColumn")
.setType("STRING")
.setMode("REQUIRED")));
var row = new TableRow();
row.set("testColumn", "testValue");
var rows = Create.of(List.of(row));
pipeline
.apply(rows)
.apply(BigQueryIO
.<TableRow>write()
.to(sinkTable)
.withFormatFunction(new SerializableFunction<TableRow,
TableRow>() {
@Override
public TableRow apply(TableRow input) {
return input;
}
})
.withSchema(schema)
.withCustomGcsTempLocation(new ValueProvider<String>()
{
@Override
public String get() {
return "/tmp";
}
@Override
public @UnknownKeyFor @NonNull @Initialized
boolean isAccessible() {
return true;
}
})
);
pipeline.run().waitUntilFinish();
```
Now I'm getting an exception related to serialization:
```
unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891,
mainOutputTag=Tag<output>, sideInputMapping={},
schemaInformation=DoFnSchemaInformation{elementConverters=[],
fieldAccessDescriptor=*}}
java.lang.IllegalArgumentException: unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.io.gcp.bigquery.BatchLoads$3@5b76b891,
mainOutputTag=Tag<output>, sideInputMapping={},
schemaInformation=DoFnSchemaInformation{elementConverters=[],
fieldAccessDescriptor=*}}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]