chamikaramj commented on code in PR #31486:
URL: https://github.com/apache/beam/pull/31486#discussion_r1704423913


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
 @Internal
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
 
   private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
+      "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+  static final String INPUT_TAG = "input";
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
   @Override
-  protected SchemaTransform 
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryWriteSchemaTransform(configuration);
   }
 
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
   @Override
   public String identifier() {
     return IDENTIFIER;
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single is expected, this returns a list with a single name.
-   */
   @Override
   public List<String> inputCollectionNames() {
     return Collections.singletonList(INPUT_TAG);
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * no output is expected, this returns an empty list.
-   */
   @Override
   public List<String> outputCollectionNames() {
     return Collections.emptyList();
   }
 
-  /**
-   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
-   * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
-   */
   protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
     /** An instance of {@link BigQueryServices} used for testing. */
     private BigQueryServices testBigQueryServices = null;
 
-    private final BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration;
+    private final BigQueryWriteConfiguration configuration;

Review Comment:
   It seems confusing to use the configuration from a different transform here.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
 @Internal
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
 
   private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
+      "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";

Review Comment:
   We should move schema-transform IDs to a proto file (same as the other PR).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -17,34 +17,26 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
+import static 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryWriteConfiguration;

Review Comment:
   Are we looking to use the same configuration in multiple places ? If so it 
should go to a separate class.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
 @Internal
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
 
   private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
+      "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+  static final String INPUT_TAG = "input";
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
   @Override
-  protected SchemaTransform 
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryWriteSchemaTransform(configuration);
   }
 
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
   @Override
   public String identifier() {
     return IDENTIFIER;
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single is expected, this returns a list with a single name.
-   */
   @Override
   public List<String> inputCollectionNames() {
     return Collections.singletonList(INPUT_TAG);
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * no output is expected, this returns an empty list.
-   */
   @Override
   public List<String> outputCollectionNames() {
     return Collections.emptyList();
   }
 
-  /**
-   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
-   * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
-   */
   protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
     /** An instance of {@link BigQueryServices} used for testing. */
     private BigQueryServices testBigQueryServices = null;
 
-    private final BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration;
+    private final BigQueryWriteConfiguration configuration;
 
-    
BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration) {
+    BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+      configuration.validate();
       this.configuration = configuration;
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      if 
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
 {
-        return;
-      }
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> rowPCollection = input.getSinglePCollection();
+      BigQueryIO.Write<Row> write = toWrite();
+      rowPCollection.apply(write);
 
-      BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
 
-      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
-      if (testBigQueryServices != null) {
-        bigQueryServices = testBigQueryServices;
+    BigQueryIO.Write<Row> toWrite() {
+      BigQueryIO.Write<Row> write =
+          BigQueryIO.<Row>write()
+              .to(configuration.getTable())
+              .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+              .withFormatFunction(BigQueryUtils.toTableRow())
+              .useBeamSchema();
+
+      if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+        CreateDisposition createDisposition =
+            
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());

Review Comment:
   As a larger point, I think we should do any transform overriding in job 
submission (BQ modes for batch/streaming etc.) so that we can just upgrade in 
the backend (at least in the first version). 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
 @Internal
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
 
   private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
+      "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+  static final String INPUT_TAG = "input";
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
   @Override
-  protected SchemaTransform 
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryWriteSchemaTransform(configuration);
   }
 
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
   @Override
   public String identifier() {
     return IDENTIFIER;
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single is expected, this returns a list with a single name.
-   */
   @Override
   public List<String> inputCollectionNames() {
     return Collections.singletonList(INPUT_TAG);
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * no output is expected, this returns an empty list.
-   */
   @Override
   public List<String> outputCollectionNames() {
     return Collections.emptyList();
   }
 
-  /**
-   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
-   * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
-   */
   protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
     /** An instance of {@link BigQueryServices} used for testing. */
     private BigQueryServices testBigQueryServices = null;
 
-    private final BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration;
+    private final BigQueryWriteConfiguration configuration;

Review Comment:
   Probably it will help to describe the larger intention of the refactoring 
the the CL description and/or a Github issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to