To expand on that: the batch runner's work scheduling and ordering does not
depend on windowing *at all. *It would be not too much of an exaggeration
to say that it treats windows simply as another grouping key - because it
assumes that input data is completely out of order, so optimizing for
latency and delivering results for earlier windows earlier is impossible or
at least not required - instead it processes the full data as a batch.

On Sun, Sep 10, 2017 at 8:32 AM Reuven Lax <[email protected]> wrote:

> In that case I can say unequivocally that Dataflow (in batch mode) does not
> produce results for a stage until it has processed that entire stage. The
> reason for this is that the batch runner is optimized for throughput, not
> latency; it wants to minimize the time for the entire job to finish, not
> the time till first output. The side input will not be materialized until
> all of the data for all of the windows of the side input have been
> processed. The streaming runner on the other hand will produce windows as
> they finish. So for the batch runner, there is no performance advantage you
> get for windowing the side input.
>
> The fact that BigQueryIO needs the schema side input to be globally
> windowed is a bit confusing and not well documented. We should add better
> javadoc explaining this.
>
> Reuven
>
> On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]> wrote:
>
> > batch on dataflow
> >
> > On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected]>
> > wrote:
> > > Which runner are you using? And is this a batch pipeline?
> > >
> > > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]>
> wrote:
> > >
> > >> Thank for the answer, but i don't think that that is the case. From
> > >> what i have seen, since i have other code to update status based on
> > >> the window, it does get called before all the windows are calculated.
> > >> There is no logical reason to wait, once the window has finished, the
> > >> rest of the pipeline should run and the BigQuery should start to write
> > >> the results.
> > >>
> > >>
> > >>
> > >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]
> >
> > >> wrote:
> > >> > Logically the BigQuery write does not depend on windows, and writing
> > it
> > >> > windowed would result in incorrect output. For this reason,
> BigQueryIO
> > >> > rewindows int global windows before actually writing to BigQuery.
> > >> >
> > >> > If you are running in batch mode, there is no performance difference
> > >> > between windowed and unwindowed side inputs. I believe that all of
> the
> > >> > batch runners wait until all windows are calculated before
> > materializing
> > >> > the output.
> > >> >
> > >> > Reuven
> > >> >
> > >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]>
> > wrote:
> > >> >
> > >> >> the schema depends on the data per window.
> > >> >> when i added the global window it works, but then i loose the
> > >> >> performance, since the secound stage of writing will begin only
> after
> > >> >> the side input has read all the data and updated the schema
> > >> >> The batchmode of the BigqueryIO seems to use a global window that i
> > >> >> don't know why?
> > >> >>
> > >> >> chaim
> > >> >>
> > >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> > >> >> <[email protected]> wrote:
> > >> >> > Are your schemas actually supposed to be different between
> > different
> > >> >> > windows, or do they depend only on data?
> > >> >> > I see you have a commented-out Window.into(new GlobalWindows())
> for
> > >> your
> > >> >> > side input - did that work when it wasn't commented out?
> > >> >> >
> > >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]>
> > wrote:
> > >> >> >
> > >> >> >> my code is:
> > >> >> >>
> > >> >> >>                     //read docs from mongo
> > >> >> >>                     final PCollection<Document> docs = pipeline
> > >> >> >>                             .apply(table.getTableName(),
> > >> >> MongoDbIO.read()
> > >> >> >>                                     .withUri("mongodb://" +
> > >> >> >> connectionParams)
> > >> >> >>                                     .withFilter(filter)
> > >> >> >>                                     .withDatabase(options.
> > >> getDBName())
> > >> >> >>                                     .withCollection(table.
> > >> >> getTableName()))
> > >> >> >>                             .apply("AddEventTimestamps",
> > >> >> >> WithTimestamps.of((Document doc) -> new
> > >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> > >> >> >>                             .apply("Window Daily",
> > >> >> >> Window.into(CalendarWindows.days(1)));
> > >> >> >>
> > >> >> >>                     //update bq schema based on window
> > >> >> >>                     final PCollectionView<Map<String, String>>
> > >> >> >> tableSchemas = docs
> > >> >> >> //                            .apply("Global
> > Window",Window.into(new
> > >> >> >> GlobalWindows()))
> > >> >> >>                             .apply("extract schema " +
> > >> >> >> table.getTableName(), new
> > >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> > >> >> >>                             .apply("getTableSchemaMemory " +
> > >> >> >> table.getTableName(),
> > >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> > >> >> >>                             .apply(View.asMap());
> > >> >> >>
> > >> >> >>                     final PCollection<TableRow> docsRows = docs
> > >> >> >>                             .apply("doc to row " +
> > >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> > getBqTableName(),
> > >> >> >> tableSchemas))
> > >> >> >>
>  .withSideInputs(tableSchemas))
> > ;
> > >> >> >>
> > >> >> >>                     final WriteResult apply = docsRows
> > >> >> >>                             .apply("insert data table - " +
> > >> >> >> table.getTableName(),
> > >> >> >>                                     BigQueryIO.writeTableRows()
> > >> >> >>
> > >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> > >> >> >> options.getDatasetId(), table.getBqTableName()))
> > >> >> >>
> > >> >> >> .withSchemaFromView(tableSchemas)
> > >> >> >>
> > >> >> >> .withCreateDisposition(BigQueryIO.Write.
> > CreateDisposition.CREATE_IF_
> > >> >> NEEDED)
> > >> >> >>
> > >> >> >> .withWriteDisposition(WRITE_APPEND));
> > >> >> >>
> > >> >> >>
> > >> >> >> exception is:
> > >> >> >>
> > >> >> >> Sep 08, 2017 12:16:55 PM
> > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> > >> >> >> INFO: Opening TableRowWriter to
> > >> >> >>
> > >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> > >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> > d9a12e4fdcfb.
> > >> >> >> Exception in thread "main"
> > >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >> >> >> java.lang.IllegalArgumentException: Attempted to get side input
> > >> window
> > >> >> >> for GlobalWindow from non-global WindowFn
> > >> >> >> at
> > >> >> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> > >> >> waitUntilFinish(DirectRunner.java:331)
> > >> >> >> at
> > >> >> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> > >> >> waitUntilFinish(DirectRunner.java:301)
> > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> > >> >> DirectRunner.java:200)
> > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> > >> >> DirectRunner.java:63)
> > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> > >> >> >> at
> > >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> > >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> > >> >> >> at
> > >> >> >> com.behalf.migration.dataflow.mongodb.
> > LoadMongodbDataPipeline.main(
> > >> >> LoadMongodbDataPipeline.java:372)
> > >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get
> > side
> > >> >> >> input window for GlobalWindow from non-global WindowFn
> > >> >> >> at
> > >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> > >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> > >> >> >> at
> > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> > >> >> SimplePushbackSideInputDoFnRunner.isReady(
> > >> SimplePushbackSideInputDoFnRun
> > >> >> ner.java:94)
> > >> >> >> at
> > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> > >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> > >> >> SimplePushbackSideInputDoFnRunner.java:76)
> > >> >> >> Sep 08, 2017 12:16:58 PM
> > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> > >> >> >>
> > >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> > >> >> >> <[email protected]> wrote:
> > >> >> >> > Please include the full exception and please show the code
> that
> > >> >> produces
> > >> >> >> it.
> > >> >> >> > See also
> > >> >> >> >
> > >> >> >> https://beam.apache.org/documentation/programming-
> > >> >> guide/#transforms-sideio
> > >> >> >> > section
> > >> >> >> > "Side inputs and windowing" - that might be sufficient to
> > resolve
> > >> your
> > >> >> >> > problem.
> > >> >> >> >
> > >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]
> >
> > >> wrote:
> > >> >> >> >
> > >> >> >> >> Hi,
> > >> >> >> >>   I have a pipline that bases on documents from mongo updates
> > the
> > >> >> >> >> schema and then adds the records to mongo. Since i want a
> > >> partitioned
> > >> >> >> >> table, i have a dally window.
> > >> >> >> >> How do i get the schema view to be a window, i get the
> > exception
> > >> of:
> > >> >> >> >>
> > >> >> >> >> Attempted to get side input window for GlobalWindow from
> > >> non-global
> > >> >> >> >> WindowFn"
> > >> >> >> >>
> > >> >> >> >> chaim
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
>

Reply via email to