[ 
https://issues.apache.org/jira/browse/BEAM-6602?focusedWorklogId=197936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197936
 ]

ASF GitHub Bot logged work on BEAM-6602:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Feb/19 05:51
            Start Date: 13/Feb/19 05:51
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on pull request #7743: [BEAM-6602] 
Preparatory PR for integrating schemas into BigQuery: Push TableRow conversion 
to end of pipeline
URL: https://github.com/apache/beam/pull/7743#discussion_r256255819
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
 ##########
 @@ -26,52 +26,59 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Instant;
 
 /**
  * Prepare an input {@link PCollection} for writing to BigQuery. Use the table 
function to determine
  * which tables each element is written to, and format the element into a 
{@link TableRow} using the
  * user-supplied format function.
  */
-public class PrepareWrite<T, DestinationT>
-    extends PTransform<PCollection<T>, PCollection<KV<DestinationT, 
TableRow>>> {
-  private DynamicDestinations<T, DestinationT> dynamicDestinations;
-  private SerializableFunction<T, TableRow> formatFunction;
+public class PrepareWrite<InputT, DestinationT, OutputT>
+    extends PTransform<PCollection<InputT>, PCollection<KV<DestinationT, 
OutputT>>> {
+  private DynamicDestinations<InputT, DestinationT> dynamicDestinations;
+  private SerializableFunction<InputT, OutputT> formatFunction;
 
   public PrepareWrite(
-      DynamicDestinations<T, DestinationT> dynamicDestinations,
-      SerializableFunction<T, TableRow> formatFunction) {
+      DynamicDestinations<InputT, DestinationT> dynamicDestinations,
+      SerializableFunction<InputT, OutputT> formatFunction) {
     this.dynamicDestinations = dynamicDestinations;
     this.formatFunction = formatFunction;
   }
 
   @Override
-  public PCollection<KV<DestinationT, TableRow>> expand(PCollection<T> input) {
+  public PCollection<KV<DestinationT, OutputT>> expand(PCollection<InputT> 
input) {
     return input.apply(
         ParDo.of(
-                new DoFn<T, KV<DestinationT, TableRow>>() {
+                new DoFn<InputT, KV<DestinationT, OutputT>>() {
                   @ProcessElement
-                  public void processElement(ProcessContext context, 
BoundedWindow window)
+                  public void processElement(
+                      ProcessContext context,
+                      @Element InputT element,
+                      @Timestamp Instant timestamp,
+                      BoundedWindow window,
+                      PaneInfo pane)
                       throws IOException {
                     
dynamicDestinations.setSideInputAccessorFromProcessContext(context);
-                    ValueInSingleWindow<T> element =
-                        ValueInSingleWindow.of(
-                            context.element(), context.timestamp(), window, 
context.pane());
-                    DestinationT tableDestination = 
dynamicDestinations.getDestination(element);
+                    ValueInSingleWindow<InputT> windowedElement =
+                        ValueInSingleWindow.of(element, timestamp, window, 
pane);
+                    DestinationT tableDestination =
+                        dynamicDestinations.getDestination(windowedElement);
                     checkArgument(
                         tableDestination != null,
                         "DynamicDestinations.getDestination() may not return 
null, "
                             + "but %s returned null on element %s",
                         dynamicDestinations,
                         element);
-                    TableRow tableRow = 
formatFunction.apply(context.element());
+                    OutputT tableRow = formatFunction.apply(element);
 
 Review comment:
   done
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 197936)
    Time Spent: 1h 20m  (was: 1h 10m)

> Support schemas in BigQueryIO.Write
> -----------------------------------
>
>                 Key: BEAM-6602
>                 URL: https://issues.apache.org/jira/browse/BEAM-6602
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-java-gcp
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>              Labels: triaged
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to