ahmedabu98 commented on code in PR #31486:
URL: https://github.com/apache/beam/pull/31486#discussion_r1705770312


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java:
##########
@@ -56,201 +48,84 @@
 @Internal
 @AutoService(SchemaTransformProvider.class)
 public class BigQueryFileLoadsWriteSchemaTransformProvider
-    extends 
TypedSchemaTransformProvider<BigQueryFileLoadsWriteSchemaTransformConfiguration>
 {
+    extends TypedSchemaTransformProvider<BigQueryWriteConfiguration> {
 
   private static final String IDENTIFIER =
-      "beam:schematransform:org.apache.beam:bigquery_fileloads_write:v1";
-  static final String INPUT_TAG = "INPUT";
-
-  /** Returns the expected class of the configuration. */
-  @Override
-  protected Class<BigQueryFileLoadsWriteSchemaTransformConfiguration> 
configurationClass() {
-    return BigQueryFileLoadsWriteSchemaTransformConfiguration.class;
-  }
+      "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
+  static final String INPUT_TAG = "input";
 
-  /** Returns the expected {@link SchemaTransform} of the configuration. */
   @Override
-  protected SchemaTransform 
from(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) {
+  protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryWriteSchemaTransform(configuration);
   }
 
-  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
   @Override
   public String identifier() {
     return IDENTIFIER;
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since a
-   * single is expected, this returns a list with a single name.
-   */
   @Override
   public List<String> inputCollectionNames() {
     return Collections.singletonList(INPUT_TAG);
   }
 
-  /**
-   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
-   * no output is expected, this returns an empty list.
-   */
   @Override
   public List<String> outputCollectionNames() {
     return Collections.emptyList();
   }
 
-  /**
-   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based 
on a {@link
-   * BigQueryFileLoadsWriteSchemaTransformConfiguration}.
-   */
   protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
     /** An instance of {@link BigQueryServices} used for testing. */
     private BigQueryServices testBigQueryServices = null;
 
-    private final BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration;
+    private final BigQueryWriteConfiguration configuration;
 
-    
BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration 
configuration) {
+    BigQueryWriteSchemaTransform(BigQueryWriteConfiguration configuration) {
+      configuration.validate();
       this.configuration = configuration;
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      if 
(!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name()))
 {
-        return;
-      }
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      PCollection<Row> rowPCollection = input.getSinglePCollection();
+      BigQueryIO.Write<Row> write = toWrite();
+      rowPCollection.apply(write);
 
-      BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
 
-      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
-      if (testBigQueryServices != null) {
-        bigQueryServices = testBigQueryServices;
+    BigQueryIO.Write<Row> toWrite() {
+      BigQueryIO.Write<Row> write =
+          BigQueryIO.<Row>write()
+              .to(configuration.getTable())
+              .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+              .withFormatFunction(BigQueryUtils.toTableRow())
+              .useBeamSchema();
+
+      if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
+        CreateDisposition createDisposition =
+            
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());

Review Comment:
   Do you mean making this switch in the SDK (ie. construction time)? I assumed 
we had settled on making it a runner side decision
   
   Some decisions are actually dependent on the runner (e.g. at least one 
streaming mode in Dataflow)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to