To expand on that: the batch runner's work scheduling and ordering does not depend on windowing *at all. *It would be not too much of an exaggeration to say that it treats windows simply as another grouping key - because it assumes that input data is completely out of order, so optimizing for latency and delivering results for earlier windows earlier is impossible or at least not required - instead it processes the full data as a batch.
On Sun, Sep 10, 2017 at 8:32 AM Reuven Lax <[email protected]> wrote: > In that case I can say unequivocally that Dataflow (in batch mode) does not > produce results for a stage until it has processed that entire stage. The > reason for this is that the batch runner is optimized for throughput, not > latency; it wants to minimize the time for the entire job to finish, not > the time till first output. The side input will not be materialized until > all of the data for all of the windows of the side input have been > processed. The streaming runner on the other hand will produce windows as > they finish. So for the batch runner, there is no performance advantage you > get for windowing the side input. > > The fact that BigQueryIO needs the schema side input to be globally > windowed is a bit confusing and not well documented. We should add better > javadoc explaining this. > > Reuven > > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]> wrote: > > > batch on dataflow > > > > On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected]> > > wrote: > > > Which runner are you using? And is this a batch pipeline? > > > > > > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> > wrote: > > > > > >> Thank for the answer, but i don't think that that is the case. From > > >> what i have seen, since i have other code to update status based on > > >> the window, it does get called before all the windows are calculated. > > >> There is no logical reason to wait, once the window has finished, the > > >> rest of the pipeline should run and the BigQuery should start to write > > >> the results. > > >> > > >> > > >> > > >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected] > > > > >> wrote: > > >> > Logically the BigQuery write does not depend on windows, and writing > > it > > >> > windowed would result in incorrect output. For this reason, > BigQueryIO > > >> > rewindows int global windows before actually writing to BigQuery. > > >> > > > >> > If you are running in batch mode, there is no performance difference > > >> > between windowed and unwindowed side inputs. I believe that all of > the > > >> > batch runners wait until all windows are calculated before > > materializing > > >> > the output. > > >> > > > >> > Reuven > > >> > > > >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> > > wrote: > > >> > > > >> >> the schema depends on the data per window. > > >> >> when i added the global window it works, but then i loose the > > >> >> performance, since the secound stage of writing will begin only > after > > >> >> the side input has read all the data and updated the schema > > >> >> The batchmode of the BigqueryIO seems to use a global window that i > > >> >> don't know why? > > >> >> > > >> >> chaim > > >> >> > > >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov > > >> >> <[email protected]> wrote: > > >> >> > Are your schemas actually supposed to be different between > > different > > >> >> > windows, or do they depend only on data? > > >> >> > I see you have a commented-out Window.into(new GlobalWindows()) > for > > >> your > > >> >> > side input - did that work when it wasn't commented out? > > >> >> > > > >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> > > wrote: > > >> >> > > > >> >> >> my code is: > > >> >> >> > > >> >> >> //read docs from mongo > > >> >> >> final PCollection<Document> docs = pipeline > > >> >> >> .apply(table.getTableName(), > > >> >> MongoDbIO.read() > > >> >> >> .withUri("mongodb://" + > > >> >> >> connectionParams) > > >> >> >> .withFilter(filter) > > >> >> >> .withDatabase(options. > > >> getDBName()) > > >> >> >> .withCollection(table. > > >> >> getTableName())) > > >> >> >> .apply("AddEventTimestamps", > > >> >> >> WithTimestamps.of((Document doc) -> new > > >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) > > >> >> >> .apply("Window Daily", > > >> >> >> Window.into(CalendarWindows.days(1))); > > >> >> >> > > >> >> >> //update bq schema based on window > > >> >> >> final PCollectionView<Map<String, String>> > > >> >> >> tableSchemas = docs > > >> >> >> // .apply("Global > > Window",Window.into(new > > >> >> >> GlobalWindows())) > > >> >> >> .apply("extract schema " + > > >> >> >> table.getTableName(), new > > >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) > > >> >> >> .apply("getTableSchemaMemory " + > > >> >> >> table.getTableName(), > > >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName()))) > > >> >> >> .apply(View.asMap()); > > >> >> >> > > >> >> >> final PCollection<TableRow> docsRows = docs > > >> >> >> .apply("doc to row " + > > >> >> >> table.getTableName(), ParDo.of(docToTableRow(table. > > getBqTableName(), > > >> >> >> tableSchemas)) > > >> >> >> > .withSideInputs(tableSchemas)) > > ; > > >> >> >> > > >> >> >> final WriteResult apply = docsRows > > >> >> >> .apply("insert data table - " + > > >> >> >> table.getTableName(), > > >> >> >> BigQueryIO.writeTableRows() > > >> >> >> > > >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), > > >> >> >> options.getDatasetId(), table.getBqTableName())) > > >> >> >> > > >> >> >> .withSchemaFromView(tableSchemas) > > >> >> >> > > >> >> >> .withCreateDisposition(BigQueryIO.Write. > > CreateDisposition.CREATE_IF_ > > >> >> NEEDED) > > >> >> >> > > >> >> >> .withWriteDisposition(WRITE_APPEND)); > > >> >> >> > > >> >> >> > > >> >> >> exception is: > > >> >> >> > > >> >> >> Sep 08, 2017 12:16:55 PM > > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > > >> >> >> INFO: Opening TableRowWriter to > > >> >> >> > > >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ > > >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc- > > d9a12e4fdcfb. > > >> >> >> Exception in thread "main" > > >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > > >> >> >> java.lang.IllegalArgumentException: Attempted to get side input > > >> window > > >> >> >> for GlobalWindow from non-global WindowFn > > >> >> >> at > > >> >> >> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > > >> >> waitUntilFinish(DirectRunner.java:331) > > >> >> >> at > > >> >> >> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > > >> >> waitUntilFinish(DirectRunner.java:301) > > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( > > >> >> DirectRunner.java:200) > > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( > > >> >> DirectRunner.java:63) > > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) > > >> >> >> at > > >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline. > > >> >> runPipeline(LoadMongodbDataPipeline.java:347) > > >> >> >> at > > >> >> >> com.behalf.migration.dataflow.mongodb. > > LoadMongodbDataPipeline.main( > > >> >> LoadMongodbDataPipeline.java:372) > > >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get > > side > > >> >> >> input window for GlobalWindow from non-global WindowFn > > >> >> >> at > > >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1. > > >> >> getSideInputWindow(PartitioningWindowFn.java:49) > > >> >> >> at > > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. > > >> >> SimplePushbackSideInputDoFnRunner.isReady( > > >> SimplePushbackSideInputDoFnRun > > >> >> ner.java:94) > > >> >> >> at > > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. > > >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows( > > >> >> SimplePushbackSideInputDoFnRunner.java:76) > > >> >> >> Sep 08, 2017 12:16:58 PM > > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > > >> >> >> > > >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov > > >> >> >> <[email protected]> wrote: > > >> >> >> > Please include the full exception and please show the code > that > > >> >> produces > > >> >> >> it. > > >> >> >> > See also > > >> >> >> > > > >> >> >> https://beam.apache.org/documentation/programming- > > >> >> guide/#transforms-sideio > > >> >> >> > section > > >> >> >> > "Side inputs and windowing" - that might be sufficient to > > resolve > > >> your > > >> >> >> > problem. > > >> >> >> > > > >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected] > > > > >> wrote: > > >> >> >> > > > >> >> >> >> Hi, > > >> >> >> >> I have a pipline that bases on documents from mongo updates > > the > > >> >> >> >> schema and then adds the records to mongo. Since i want a > > >> partitioned > > >> >> >> >> table, i have a dally window. > > >> >> >> >> How do i get the schema view to be a window, i get the > > exception > > >> of: > > >> >> >> >> > > >> >> >> >> Attempted to get side input window for GlobalWindow from > > >> non-global > > >> >> >> >> WindowFn" > > >> >> >> >> > > >> >> >> >> chaim > > >> >> >> >> > > >> >> >> > > >> >> > > >> > > >
