chamikaramj commented on code in PR #31486:
URL: https://github.com/apache/beam/pull/31486#discussion_r1704423913
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
{
+ extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration>
configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ static final String INPUT_TAG = "input";
- /** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
- protected SchemaTransform
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
@Override
public String identifier() {
return IDENTIFIER;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration;
+ private final BigQueryWriteConfiguration configuration;
Review Comment:
It seems confusing to use the configuration from a different transform here.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
{
+ extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration>
configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
Review Comment:
We should move schema-transform IDs to a proto file (same as the other PR).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -17,34 +17,26 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
+import static
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryWriteConfiguration;
Review Comment:
Are we looking to use the same configuration in multiple places ? If so it
should go to a separate class.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
{
+ extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration>
configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ static final String INPUT_TAG = "input";
- /** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
- protected SchemaTransform
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
@Override
public String identifier() {
return IDENTIFIER;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration;
+ private final BigQueryWriteConfiguration configuration;
-
BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration) {
+ BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+ configuration.validate();
this.configuration = configuration;
}
@Override
- public void validate(PipelineOptions options) {
- if
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
{
- return;
- }
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PCollection<Row> rowPCollection = input.getSinglePCollection();
+ BigQueryIO.Write<Row> write = toWrite();
+ rowPCollection.apply(write);
- BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
- BigQueryServices bigQueryServices = new BigQueryServicesImpl();
- if (testBigQueryServices != null) {
- bigQueryServices = testBigQueryServices;
+ BigQueryIO.Write<Row> toWrite() {
+ BigQueryIO.Write<Row> write =
+ BigQueryIO.<Row>write()
+ .to(configuration.getTable())
+ .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+ .withFormatFunction(BigQueryUtils.toTableRow())
+ .useBeamSchema();
+
+ if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+ CreateDisposition createDisposition =
+
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
Review Comment:
As a larger point, I think we should do any transform overriding in job
submission (BQ modes for batch/streaming etc.) so that we can just upgrade in
the backend (at least in the first version).
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
@Internal
@AutoService(SchemaTransformProvider.class)
public class BigQueryFileLoadsWriteSchemaTransformProvider
- extends
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
{
+ extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
private static final String IDENTIFIER =
- "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
- static final String INPUT_TAG = "INPUT";
-
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration>
configurationClass() {
- return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
- }
+ "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+ static final String INPUT_TAG = "input";
- /** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
- protected SchemaTransform
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
}
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
@Override
public String identifier() {
return IDENTIFIER;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single is expected, this returns a list with a single name.
- */
@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_TAG);
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * no output is expected, this returns an empty list.
- */
@Override
public List<String> outputCollectionNames() {
return Collections.emptyList();
}
- /**
- * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
- * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
- */
protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;
- private final BigQueryFileLoadsWriteSchemaTransformConfiguration
configuration;
+ private final BigQueryWriteConfiguration configuration;
Review Comment:
Probably it will help to describe the larger intention of the refactoring
the the CL description and/or a Github issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]