Thank for the answer, but i don't think that that is the case. From
what i have seen, since i have other code to update status based on
the window, it does get called before all the windows are calculated.
There is no logical reason to wait, once the window has finished, the
rest of the pipeline should run and the BigQuery should start to write
the results.



On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]> wrote:
> Logically the BigQuery write does not depend on windows, and writing it
> windowed would result in incorrect output. For this reason, BigQueryIO
> rewindows int global windows before actually writing to BigQuery.
>
> If you are running in batch mode, there is no performance difference
> between windowed and unwindowed side inputs. I believe that all of the
> batch runners wait until all windows are calculated before materializing
> the output.
>
> Reuven
>
> On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> wrote:
>
>> the schema depends on the data per window.
>> when i added the global window it works, but then i loose the
>> performance, since the secound stage of writing will begin only after
>> the side input has read all the data and updated the schema
>> The batchmode of the BigqueryIO seems to use a global window that i
>> don't know why?
>>
>> chaim
>>
>> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> <[email protected]> wrote:
>> > Are your schemas actually supposed to be different between different
>> > windows, or do they depend only on data?
>> > I see you have a commented-out Window.into(new GlobalWindows()) for your
>> > side input - did that work when it wasn't commented out?
>> >
>> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote:
>> >
>> >> my code is:
>> >>
>> >>                     //read docs from mongo
>> >>                     final PCollection<Document> docs = pipeline
>> >>                             .apply(table.getTableName(),
>> MongoDbIO.read()
>> >>                                     .withUri("mongodb://" +
>> >> connectionParams)
>> >>                                     .withFilter(filter)
>> >>                                     .withDatabase(options.getDBName())
>> >>                                     .withCollection(table.
>> getTableName()))
>> >>                             .apply("AddEventTimestamps",
>> >> WithTimestamps.of((Document doc) -> new
>> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >>                             .apply("Window Daily",
>> >> Window.into(CalendarWindows.days(1)));
>> >>
>> >>                     //update bq schema based on window
>> >>                     final PCollectionView<Map<String, String>>
>> >> tableSchemas = docs
>> >> //                            .apply("Global Window",Window.into(new
>> >> GlobalWindows()))
>> >>                             .apply("extract schema " +
>> >> table.getTableName(), new
>> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >>                             .apply("getTableSchemaMemory " +
>> >> table.getTableName(),
>> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >>                             .apply(View.asMap());
>> >>
>> >>                     final PCollection<TableRow> docsRows = docs
>> >>                             .apply("doc to row " +
>> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> >> tableSchemas))
>> >>                                     .withSideInputs(tableSchemas));
>> >>
>> >>                     final WriteResult apply = docsRows
>> >>                             .apply("insert data table - " +
>> >> table.getTableName(),
>> >>                                     BigQueryIO.writeTableRows()
>> >>
>> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> options.getDatasetId(), table.getBqTableName()))
>> >>
>> >> .withSchemaFromView(tableSchemas)
>> >>
>> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
>> NEEDED)
>> >>
>> >> .withWriteDisposition(WRITE_APPEND));
>> >>
>> >>
>> >> exception is:
>> >>
>> >> Sep 08, 2017 12:16:55 PM
>> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> INFO: Opening TableRowWriter to
>> >>
>> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> >> Exception in thread "main"
>> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> java.lang.IllegalArgumentException: Attempted to get side input window
>> >> for GlobalWindow from non-global WindowFn
>> >> at
>> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> waitUntilFinish(DirectRunner.java:331)
>> >> at
>> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> waitUntilFinish(DirectRunner.java:301)
>> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> DirectRunner.java:200)
>> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> DirectRunner.java:63)
>> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> at
>> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> at
>> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
>> LoadMongodbDataPipeline.java:372)
>> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> >> input window for GlobalWindow from non-global WindowFn
>> >> at
>> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
>> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> at
>> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRun
>> ner.java:94)
>> >> at
>> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> SimplePushbackSideInputDoFnRunner.java:76)
>> >> Sep 08, 2017 12:16:58 PM
>> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >>
>> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> <[email protected]> wrote:
>> >> > Please include the full exception and please show the code that
>> produces
>> >> it.
>> >> > See also
>> >> >
>> >> https://beam.apache.org/documentation/programming-
>> guide/#transforms-sideio
>> >> > section
>> >> > "Side inputs and windowing" - that might be sufficient to resolve your
>> >> > problem.
>> >> >
>> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote:
>> >> >
>> >> >> Hi,
>> >> >>   I have a pipline that bases on documents from mongo updates the
>> >> >> schema and then adds the records to mongo. Since i want a partitioned
>> >> >> table, i have a dally window.
>> >> >> How do i get the schema view to be a window, i get the exception of:
>> >> >>
>> >> >> Attempted to get side input window for GlobalWindow from non-global
>> >> >> WindowFn"
>> >> >>
>> >> >> chaim
>> >> >>
>> >>
>>

Reply via email to