In that case I can say unequivocally that Dataflow (in batch mode) does not
produce results for a stage until it has processed that entire stage. The
reason for this is that the batch runner is optimized for throughput, not
latency; it wants to minimize the time for the entire job to finish, not
the time till first output. The side input will not be materialized until
all of the data for all of the windows of the side input have been
processed. The streaming runner on the other hand will produce windows as
they finish. So for the batch runner, there is no performance advantage you
get for windowing the side input.

The fact that BigQueryIO needs the schema side input to be globally
windowed is a bit confusing and not well documented. We should add better
javadoc explaining this.

Reuven

On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]> wrote:

> batch on dataflow
>
> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected]>
> wrote:
> > Which runner are you using? And is this a batch pipeline?
> >
> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> wrote:
> >
> >> Thank for the answer, but i don't think that that is the case. From
> >> what i have seen, since i have other code to update status based on
> >> the window, it does get called before all the windows are calculated.
> >> There is no logical reason to wait, once the window has finished, the
> >> rest of the pipeline should run and the BigQuery should start to write
> >> the results.
> >>
> >>
> >>
> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]>
> >> wrote:
> >> > Logically the BigQuery write does not depend on windows, and writing
> it
> >> > windowed would result in incorrect output. For this reason, BigQueryIO
> >> > rewindows int global windows before actually writing to BigQuery.
> >> >
> >> > If you are running in batch mode, there is no performance difference
> >> > between windowed and unwindowed side inputs. I believe that all of the
> >> > batch runners wait until all windows are calculated before
> materializing
> >> > the output.
> >> >
> >> > Reuven
> >> >
> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]>
> wrote:
> >> >
> >> >> the schema depends on the data per window.
> >> >> when i added the global window it works, but then i loose the
> >> >> performance, since the secound stage of writing will begin only after
> >> >> the side input has read all the data and updated the schema
> >> >> The batchmode of the BigqueryIO seems to use a global window that i
> >> >> don't know why?
> >> >>
> >> >> chaim
> >> >>
> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> <[email protected]> wrote:
> >> >> > Are your schemas actually supposed to be different between
> different
> >> >> > windows, or do they depend only on data?
> >> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
> >> your
> >> >> > side input - did that work when it wasn't commented out?
> >> >> >
> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]>
> wrote:
> >> >> >
> >> >> >> my code is:
> >> >> >>
> >> >> >>                     //read docs from mongo
> >> >> >>                     final PCollection<Document> docs = pipeline
> >> >> >>                             .apply(table.getTableName(),
> >> >> MongoDbIO.read()
> >> >> >>                                     .withUri("mongodb://" +
> >> >> >> connectionParams)
> >> >> >>                                     .withFilter(filter)
> >> >> >>                                     .withDatabase(options.
> >> getDBName())
> >> >> >>                                     .withCollection(table.
> >> >> getTableName()))
> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >>                             .apply("Window Daily",
> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >>
> >> >> >>                     //update bq schema based on window
> >> >> >>                     final PCollectionView<Map<String, String>>
> >> >> >> tableSchemas = docs
> >> >> >> //                            .apply("Global
> Window",Window.into(new
> >> >> >> GlobalWindows()))
> >> >> >>                             .apply("extract schema " +
> >> >> >> table.getTableName(), new
> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >>                             .apply("getTableSchemaMemory " +
> >> >> >> table.getTableName(),
> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >>                             .apply(View.asMap());
> >> >> >>
> >> >> >>                     final PCollection<TableRow> docsRows = docs
> >> >> >>                             .apply("doc to row " +
> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> getBqTableName(),
> >> >> >> tableSchemas))
> >> >> >>                                     .withSideInputs(tableSchemas))
> ;
> >> >> >>
> >> >> >>                     final WriteResult apply = docsRows
> >> >> >>                             .apply("insert data table - " +
> >> >> >> table.getTableName(),
> >> >> >>                                     BigQueryIO.writeTableRows()
> >> >> >>
> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >>
> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >>
> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> CreateDisposition.CREATE_IF_
> >> >> NEEDED)
> >> >> >>
> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >>
> >> >> >>
> >> >> >> exception is:
> >> >> >>
> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> INFO: Opening TableRowWriter to
> >> >> >>
> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> d9a12e4fdcfb.
> >> >> >> Exception in thread "main"
> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> java.lang.IllegalArgumentException: Attempted to get side input
> >> window
> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> DirectRunner.java:200)
> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> DirectRunner.java:63)
> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> at
> >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> at
> >> >> >> com.behalf.migration.dataflow.mongodb.
> LoadMongodbDataPipeline.main(
> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get
> side
> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> at
> >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> SimplePushbackSideInputDoFnRun
> >> >> ner.java:94)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >>
> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> <[email protected]> wrote:
> >> >> >> > Please include the full exception and please show the code that
> >> >> produces
> >> >> >> it.
> >> >> >> > See also
> >> >> >> >
> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> guide/#transforms-sideio
> >> >> >> > section
> >> >> >> > "Side inputs and windowing" - that might be sufficient to
> resolve
> >> your
> >> >> >> > problem.
> >> >> >> >
> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]>
> >> wrote:
> >> >> >> >
> >> >> >> >> Hi,
> >> >> >> >>   I have a pipline that bases on documents from mongo updates
> the
> >> >> >> >> schema and then adds the records to mongo. Since i want a
> >> partitioned
> >> >> >> >> table, i have a dally window.
> >> >> >> >> How do i get the schema view to be a window, i get the
> exception
> >> of:
> >> >> >> >>
> >> >> >> >> Attempted to get side input window for GlobalWindow from
> >> non-global
> >> >> >> >> WindowFn"
> >> >> >> >>
> >> >> >> >> chaim
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Reply via email to