Ah, so you are loading each window into a separate BigQuery table? That
might be the reason things are slow. Remembert a batch job doesn't return
until everything finishes, and if you are loading that many tables it's
entirely possible that BigQuery will throttle you, causing the slowdown.

A couple of options:

1. Instead of loading into separate BigQuery tables, you could load into
separate partitions of the same table. See this page for more info:
https://cloud.google.com/bigquery/docs/partitioned-tables

2. If you have a streaming unbounded source for your data, you can run
using a streaming runner. That will load each window as it becomes
available instead of waiting for everything to load.

Reuven

On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <[email protected]> wrote:

> from what i found it I have the windowing with bigquery partition (per
> day - 1545 partitions) the insert can take 5 hours, where if there is
> no partitions then it takes about 12 minutes
>
> I have 13,843,080 recrods 6.76 GB.
> Any ideas how to get the partition to work faster.
>
> Is there a way to get the BigQueryIO to use streaming and not jobs?
>
> chaim
>
> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> wrote:
> > i am using windowing for the partion of the table, maybe that has to do
> with it?
> >
> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected]>
> wrote:
> >> Ok, something is going wrong then. It appears that your job created over
> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> things
> >> were so slow).
> >>
> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote:
> >>
> >>> no that specific job created only 2 tables
> >>>
> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]>
> >>> wrote:
> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> tables.
> >>> > Does that sound correct to you?
> >>> >
> >>> > Reuven
> >>> >
> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]>
> wrote:
> >>> >
> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >>> >> as you can see the majority of the time is inserting into bigquery.
> >>> >> is there any way to parallel this?
> >>> >>
> >>> >> My feeling for the windowing is that writing should be done per
> window
> >>> >> (my window is daily) or at least to be able to configure it
> >>> >>
> >>> >> chaim
> >>> >>
> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> <[email protected]>
> >>> >> wrote:
> >>> >> > So the problem is you are running on Dataflow, and it's taking
> longer
> >>> >> than
> >>> >> > you think it should? If you provide the Dataflow job id we can
> help
> >>> you
> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> >>> into a
> >>> >> > Dataflow debugging session we should move it off of the Beam list
> and
> >>> >> onto
> >>> >> > a Dataflow-specific tread)
> >>> >> >
> >>> >> > Reuven
> >>> >> >
> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]>
> >>> wrote:
> >>> >> >
> >>> >> >> is there a way around this, my time for 13gb is not close to 30
> >>> >> >> minutes, while it should be around 15 minutes.
> >>> >> >> Do i need to chunk the code myself to windows, and run in
> parallel?
> >>> >> >> chaim
> >>> >> >>
> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
> <[email protected]
> >>> >
> >>> >> >> wrote:
> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> mode)
> >>> >> does
> >>> >> >> not
> >>> >> >> > produce results for a stage until it has processed that entire
> >>> stage.
> >>> >> The
> >>> >> >> > reason for this is that the batch runner is optimized for
> >>> throughput,
> >>> >> not
> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >>> finish,
> >>> >> not
> >>> >> >> > the time till first output. The side input will not be
> materialized
> >>> >> until
> >>> >> >> > all of the data for all of the windows of the side input have
> been
> >>> >> >> > processed. The streaming runner on the other hand will produce
> >>> >> windows as
> >>> >> >> > they finish. So for the batch runner, there is no performance
> >>> >> advantage
> >>> >> >> you
> >>> >> >> > get for windowing the side input.
> >>> >> >> >
> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> globally
> >>> >> >> > windowed is a bit confusing and not well documented. We should
> add
> >>> >> better
> >>> >> >> > javadoc explaining this.
> >>> >> >> >
> >>> >> >> > Reuven
> >>> >> >> >
> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> [email protected]>
> >>> >> wrote:
> >>> >> >> >
> >>> >> >> >> batch on dataflow
> >>> >> >> >>
> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >>> <[email protected]
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >>> >> >> >> >
> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
> [email protected]
> >>> >
> >>> >> >> wrote:
> >>> >> >> >> >
> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
> case.
> >>> >> From
> >>> >> >> >> >> what i have seen, since i have other code to update status
> >>> based
> >>> >> on
> >>> >> >> >> >> the window, it does get called before all the windows are
> >>> >> calculated.
> >>> >> >> >> >> There is no logical reason to wait, once the window has
> >>> finished,
> >>> >> the
> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
> start
> >>> to
> >>> >> >> write
> >>> >> >> >> >> the results.
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >>> >> <[email protected]
> >>> >> >> >
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
> and
> >>> >> >> writing
> >>> >> >> >> it
> >>> >> >> >> >> > windowed would result in incorrect output. For this
> reason,
> >>> >> >> BigQueryIO
> >>> >> >> >> >> > rewindows int global windows before actually writing to
> >>> >> BigQuery.
> >>> >> >> >> >> >
> >>> >> >> >> >> > If you are running in batch mode, there is no performance
> >>> >> >> difference
> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
> that
> >>> all
> >>> >> of
> >>> >> >> the
> >>> >> >> >> >> > batch runners wait until all windows are calculated
> before
> >>> >> >> >> materializing
> >>> >> >> >> >> > the output.
> >>> >> >> >> >> >
> >>> >> >> >> >> > Reuven
> >>> >> >> >> >> >
> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> >>> [email protected]
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> >> >
> >>> >> >> >> >> >> the schema depends on the data per window.
> >>> >> >> >> >> >> when i added the global window it works, but then i
> loose
> >>> the
> >>> >> >> >> >> >> performance, since the secound stage of writing will
> begin
> >>> only
> >>> >> >> after
> >>> >> >> >> >> >> the side input has read all the data and updated the
> schema
> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
> window
> >>> >> that
> >>> >> >> i
> >>> >> >> >> >> >> don't know why?
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> chaim
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >>> >> >> >> >> >> <[email protected]> wrote:
> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
> between
> >>> >> >> >> different
> >>> >> >> >> >> >> > windows, or do they depend only on data?
> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >>> >> GlobalWindows())
> >>> >> >> for
> >>> >> >> >> >> your
> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
> out?
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >>> >> [email protected]>
> >>> >> >> >> wrote:
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> >> my code is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //read docs from mongo
> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
> =
> >>> >> pipeline
> >>> >> >> >> >> >> >>
>  .apply(table.getTableName(),
> >>> >> >> >> >> >> MongoDbIO.read()
> >>> >> >> >> >> >> >>
> >>>  .withUri("mongodb://" +
> >>> >> >> >> >> >> >> connectionParams)
> >>> >> >> >> >> >> >>
>  .withFilter(filter)
> >>> >> >> >> >> >> >>
> >>>  .withDatabase(options.
> >>> >> >> >> >> getDBName())
> >>> >> >> >> >> >> >>
> >>>  .withCollection(table.
> >>> >> >> >> >> >> getTableName()))
> >>> >> >> >> >> >> >>
>  .apply("AddEventTimestamps",
> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >>> >> >> >> >> >> >>                             .apply("Window Daily",
> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //update bq schema based on
> window
> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
> >>> >> String>>
> >>> >> >> >> >> >> >> tableSchemas = docs
> >>> >> >> >> >> >> >> //                            .apply("Global
> >>> >> >> >> Window",Window.into(new
> >>> >> >> >> >> >> >> GlobalWindows()))
> >>> >> >> >> >> >> >>                             .apply("extract schema "
> +
> >>> >> >> >> >> >> >> table.getTableName(), new
> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
> DocsToSchemaTransform(table))
> >>> >> >> >> >> >> >>
>  .apply("getTableSchemaMemory
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
> table.getTableName())))
> >>> >> >> >> >> >> >>                             .apply(View.asMap());
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
> docsRows
> >>> =
> >>> >> docs
> >>> >> >> >> >> >> >>                             .apply("doc to row " +
> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >>> >> >> >> getBqTableName(),
> >>> >> >> >> >> >> >> tableSchemas))
> >>> >> >> >> >> >> >>
> >>> >> >>  .withSideInputs(tableSchemas))
> >>> >> >> >> ;
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final WriteResult apply =
> docsRows
> >>> >> >> >> >> >> >>                             .apply("insert data
> table -
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >>
> >>> >>  BigQueryIO.writeTableRows()
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >>> >> >> >> CreateDisposition.CREATE_IF_
> >>> >> >> >> >> >> NEEDED)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> exception is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
> 0d/cb3f0aef-9aeb-47ac-93dc-
> >>> >> >> >> d9a12e4fdcfb.
> >>> >> >> >> >> >> >> Exception in thread "main"
> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
> PipelineExecutionException:
> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
> >>> side
> >>> >> >> input
> >>> >> >> >> >> window
> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:200)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:63)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:297)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:283)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> LoadMongodbDataPipeline.
> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> >> >> LoadMongodbDataPipeline.main(
> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> >>> Attempted to
> >>> >> >> get
> >>> >> >> >> side
> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
> WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >>> >> >> PartitioningWindowFn$1.
> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
> >>> >> >> >> >> >> ner.java:94)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >>> >> processElementInReadyWindows(
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >>> >> >> >> >> >> >> <[email protected]> wrote:
> >>> >> >> >> >> >> >> > Please include the full exception and please show
> the
> >>> code
> >>> >> >> that
> >>> >> >> >> >> >> produces
> >>> >> >> >> >> >> >> it.
> >>> >> >> >> >> >> >> > See also
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >>> >> >> >> >> >> guide/#transforms-sideio
> >>> >> >> >> >> >> >> > section
> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
> sufficient
> >>> to
> >>> >> >> >> resolve
> >>> >> >> >> >> your
> >>> >> >> >> >> >> >> > problem.
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >>> >> >> [email protected]>
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> >> Hi,
> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
> mongo
> >>> >> >> updates
> >>> >> >> >> the
> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
> >>> want a
> >>> >> >> >> >> partitioned
> >>> >> >> >> >> >> >> >> table, i have a dally window.
> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
> get the
> >>> >> >> >> exception
> >>> >> >> >> >> of:
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> Attempted to get side input window for
> GlobalWindow
> >>> from
> >>> >> >> >> >> non-global
> >>> >> >> >> >> >> >> >> WindowFn"
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> chaim
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >>
> >>> >> >>
> >>> >>
> >>>
>

Reply via email to