chamikaramj commented on code in PR #31486:
URL: https://github.com/apache/beam/pull/31486#discussion_r1705954792
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
{
+ extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration>
configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ static final String INPUT_TAG = "input";
- /** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
- protected SchemaTransform
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
@Override
public String identifier() {
return IDENTIFIER;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration;
+ private final BigQueryWriteConfiguration configuration;
-
BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration) {
+ BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+ configuration.validate();
this.configuration = configuration;
}
@Override
- public void validate(PipelineOptions options) {
- if
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
{
- return;
- }
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection<Row> rowPCollection = input.getSinglePCollection();
+ BigQueryIO.Write<Row> write = toWrite();
+ rowPCollection.apply(write);
- BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
+ BigQueryIO.Write<Row> toWrite() {
+ BigQueryIO.Write<Row> write =
+ BigQueryIO.<Row>write()
+ .to(configuration.getTable())
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withFormatFunction(BigQueryUtils.toTableRow())
+ .useBeamSchema();
+
+ if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+ CreateDisposition createDisposition =
+
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
Review Comment:
> Do you mean making this switch in the SDK (ie. construction time)? I
assumed we had settled on making it a runner side decision
Yeah. Added some comments to the relavent doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]