dartiga opened a new issue, #22543:
URL: https://github.com/apache/beam/issues/22543
### What happened?
**Beam: 2.40**
While using custom `DynamicDestination` in `BigQueryIO.Write` got the
following exception:
```
org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:161)
Caused by: java.lang.ClassCastException:
org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to
java.lang.String
com.king.da.destinations.KingAppDestinations.getSchema(KingAppDestinations.java:17)
org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:134)
org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination$DoFnInvoker.invokeProcessElement(Unknown
Source)
```
Find below test case to reproduce:
```java
@RunWith(JUnit4.class)
public class UpdateSchemaDestinationTest {
static final BigqueryClient BQ_CLIENT = new
BigqueryClient(UpdateSchemaDestinationTest.class.getName());
static final String DATASET_ID =
"schema_update_options_class_cast_excepption"
+ System.currentTimeMillis()
+ "_"
+ new SecureRandom().nextInt(32);
static TestBigQueryOptions options =
TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);
static Pipeline pipeline;
@BeforeClass
public static void setUpAll() throws IOException, InterruptedException {
options.setTempLocation(options.getTempRoot() + "/bq_it_temp");
pipeline = Pipeline.create(options);
BQ_CLIENT.createNewDataset(options.getProject(), DATASET_ID);
}
@AfterClass
public static void tearDownAll() {
BQ_CLIENT.deleteDataset(options.getProject(), DATASET_ID);
}
@Test
public void classCastExceptionRegression() {
DynamicDestinations<KV<String, String>, String> destinations = new
SomeDynamicDestinations();
PCollection<KV<String, String>> rows = pipeline
.apply(Create.of(
KV.of("table","foo"),
KV.of("table","bar")
));
rows.apply(BigQueryIO.<KV<String, String>>write()
.withFormatFunction(kv -> new TableRow().set("name",
kv.getValue()))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withMaxBytesPerPartition(1)
.to(destinations));
pipeline.run().waitUntilFinish();
}
private static final class SomeDynamicDestinations extends
DynamicDestinations<KV<String, String>, String> {
private static final long serialVersionUID = 1L;
@Override
public String getDestination(@Nullable
ValueInSingleWindow<KV<String, String>> element) {
return element.getValue().getKey();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(DATASET_ID + "." + destination, "a
table");
}
@Override
public @Nullable TableSchema getSchema(String destination) {
return new TableSchema().setFields(Arrays.asList(new
TableFieldSchema().setName("name").setType("STRING")));
}
}
}
```
### Issue Priority
Priority: 1
### Issue Component
Component: io-java-gcp
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]