Logically the BigQuery write does not depend on windows, and writing it
windowed would result in incorrect output. For this reason, BigQueryIO
rewindows int global windows before actually writing to BigQuery.

If you are running in batch mode, there is no performance difference
between windowed and unwindowed side inputs. I believe that all of the
batch runners wait until all windows are calculated before materializing
the output.

Reuven

On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> wrote:

> the schema depends on the data per window.
> when i added the global window it works, but then i loose the
> performance, since the secound stage of writing will begin only after
> the side input has read all the data and updated the schema
> The batchmode of the BigqueryIO seems to use a global window that i
> don't know why?
>
> chaim
>
> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> <[email protected]> wrote:
> > Are your schemas actually supposed to be different between different
> > windows, or do they depend only on data?
> > I see you have a commented-out Window.into(new GlobalWindows()) for your
> > side input - did that work when it wasn't commented out?
> >
> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote:
> >
> >> my code is:
> >>
> >>                     //read docs from mongo
> >>                     final PCollection<Document> docs = pipeline
> >>                             .apply(table.getTableName(),
> MongoDbIO.read()
> >>                                     .withUri("mongodb://" +
> >> connectionParams)
> >>                                     .withFilter(filter)
> >>                                     .withDatabase(options.getDBName())
> >>                                     .withCollection(table.
> getTableName()))
> >>                             .apply("AddEventTimestamps",
> >> WithTimestamps.of((Document doc) -> new
> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >>                             .apply("Window Daily",
> >> Window.into(CalendarWindows.days(1)));
> >>
> >>                     //update bq schema based on window
> >>                     final PCollectionView<Map<String, String>>
> >> tableSchemas = docs
> >> //                            .apply("Global Window",Window.into(new
> >> GlobalWindows()))
> >>                             .apply("extract schema " +
> >> table.getTableName(), new
> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >>                             .apply("getTableSchemaMemory " +
> >> table.getTableName(),
> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >>                             .apply(View.asMap());
> >>
> >>                     final PCollection<TableRow> docsRows = docs
> >>                             .apply("doc to row " +
> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
> >> tableSchemas))
> >>                                     .withSideInputs(tableSchemas));
> >>
> >>                     final WriteResult apply = docsRows
> >>                             .apply("insert data table - " +
> >> table.getTableName(),
> >>                                     BigQueryIO.writeTableRows()
> >>
> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> options.getDatasetId(), table.getBqTableName()))
> >>
> >> .withSchemaFromView(tableSchemas)
> >>
> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
> NEEDED)
> >>
> >> .withWriteDisposition(WRITE_APPEND));
> >>
> >>
> >> exception is:
> >>
> >> Sep 08, 2017 12:16:55 PM
> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> INFO: Opening TableRowWriter to
> >>
> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
> >> Exception in thread "main"
> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> java.lang.IllegalArgumentException: Attempted to get side input window
> >> for GlobalWindow from non-global WindowFn
> >> at
> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:331)
> >> at
> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:301)
> >> at org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:200)
> >> at org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:63)
> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> at
> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> runPipeline(LoadMongodbDataPipeline.java:347)
> >> at
> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
> LoadMongodbDataPipeline.java:372)
> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
> >> input window for GlobalWindow from non-global WindowFn
> >> at
> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> getSideInputWindow(PartitioningWindowFn.java:49)
> >> at
> >> org.apache.beam.runners.direct.repackaged.runners.core.
> SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRun
> ner.java:94)
> >> at
> >> org.apache.beam.runners.direct.repackaged.runners.core.
> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> SimplePushbackSideInputDoFnRunner.java:76)
> >> Sep 08, 2017 12:16:58 PM
> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >>
> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> <[email protected]> wrote:
> >> > Please include the full exception and please show the code that
> produces
> >> it.
> >> > See also
> >> >
> >> https://beam.apache.org/documentation/programming-
> guide/#transforms-sideio
> >> > section
> >> > "Side inputs and windowing" - that might be sufficient to resolve your
> >> > problem.
> >> >
> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote:
> >> >
> >> >> Hi,
> >> >>   I have a pipline that bases on documents from mongo updates the
> >> >> schema and then adds the records to mongo. Since i want a partitioned
> >> >> table, i have a dally window.
> >> >> How do i get the schema view to be a window, i get the exception of:
> >> >>
> >> >> Attempted to get side input window for GlobalWindow from non-global
> >> >> WindowFn"
> >> >>
> >> >> chaim
> >> >>
> >>
>

Reply via email to