from what i found it I have the windowing with bigquery partition (per
day - 1545 partitions) the insert can take 5 hours, where if there is
no partitions then it takes about 12 minutes

I have 13,843,080 recrods 6.76 GB.
Any ideas how to get the partition to work faster.

Is there a way to get the BigQueryIO to use streaming and not jobs?

chaim

On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> wrote:
> i am using windowing for the partion of the table, maybe that has to do with 
> it?
>
> On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected]> wrote:
>> Ok, something is going wrong then. It appears that your job created over
>> 14,000 BigQuery load jobs, which is not expected (and probably why things
>> were so slow).
>>
>> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote:
>>
>>> no that specific job created only 2 tables
>>>
>>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]>
>>> wrote:
>>> > It looks like your job is creating about 14,45 distinct BigQuery tables.
>>> > Does that sound correct to you?
>>> >
>>> > Reuven
>>> >
>>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> wrote:
>>> >
>>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>>> >> as you can see the majority of the time is inserting into bigquery.
>>> >> is there any way to parallel this?
>>> >>
>>> >> My feeling for the windowing is that writing should be done per window
>>> >> (my window is daily) or at least to be able to configure it
>>> >>
>>> >> chaim
>>> >>
>>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <[email protected]>
>>> >> wrote:
>>> >> > So the problem is you are running on Dataflow, and it's taking longer
>>> >> than
>>> >> > you think it should? If you provide the Dataflow job id we can help
>>> you
>>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>>> into a
>>> >> > Dataflow debugging session we should move it off of the Beam list and
>>> >> onto
>>> >> > a Dataflow-specific tread)
>>> >> >
>>> >> > Reuven
>>> >> >
>>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]>
>>> wrote:
>>> >> >
>>> >> >> is there a way around this, my time for 13gb is not close to 30
>>> >> >> minutes, while it should be around 15 minutes.
>>> >> >> Do i need to chunk the code myself to windows, and run in parallel?
>>> >> >> chaim
>>> >> >>
>>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <[email protected]
>>> >
>>> >> >> wrote:
>>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>>> >> does
>>> >> >> not
>>> >> >> > produce results for a stage until it has processed that entire
>>> stage.
>>> >> The
>>> >> >> > reason for this is that the batch runner is optimized for
>>> throughput,
>>> >> not
>>> >> >> > latency; it wants to minimize the time for the entire job to
>>> finish,
>>> >> not
>>> >> >> > the time till first output. The side input will not be materialized
>>> >> until
>>> >> >> > all of the data for all of the windows of the side input have been
>>> >> >> > processed. The streaming runner on the other hand will produce
>>> >> windows as
>>> >> >> > they finish. So for the batch runner, there is no performance
>>> >> advantage
>>> >> >> you
>>> >> >> > get for windowing the side input.
>>> >> >> >
>>> >> >> > The fact that BigQueryIO needs the schema side input to be globally
>>> >> >> > windowed is a bit confusing and not well documented. We should add
>>> >> better
>>> >> >> > javadoc explaining this.
>>> >> >> >
>>> >> >> > Reuven
>>> >> >> >
>>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]>
>>> >> wrote:
>>> >> >> >
>>> >> >> >> batch on dataflow
>>> >> >> >>
>>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>>> <[email protected]
>>> >> >
>>> >> >> >> wrote:
>>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>>> >> >> >> >
>>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]
>>> >
>>> >> >> wrote:
>>> >> >> >> >
>>> >> >> >> >> Thank for the answer, but i don't think that that is the case.
>>> >> From
>>> >> >> >> >> what i have seen, since i have other code to update status
>>> based
>>> >> on
>>> >> >> >> >> the window, it does get called before all the windows are
>>> >> calculated.
>>> >> >> >> >> There is no logical reason to wait, once the window has
>>> finished,
>>> >> the
>>> >> >> >> >> rest of the pipeline should run and the BigQuery should start
>>> to
>>> >> >> write
>>> >> >> >> >> the results.
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>>> >> <[email protected]
>>> >> >> >
>>> >> >> >> >> wrote:
>>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
>>> >> >> writing
>>> >> >> >> it
>>> >> >> >> >> > windowed would result in incorrect output. For this reason,
>>> >> >> BigQueryIO
>>> >> >> >> >> > rewindows int global windows before actually writing to
>>> >> BigQuery.
>>> >> >> >> >> >
>>> >> >> >> >> > If you are running in batch mode, there is no performance
>>> >> >> difference
>>> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
>>> all
>>> >> of
>>> >> >> the
>>> >> >> >> >> > batch runners wait until all windows are calculated before
>>> >> >> >> materializing
>>> >> >> >> >> > the output.
>>> >> >> >> >> >
>>> >> >> >> >> > Reuven
>>> >> >> >> >> >
>>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>>> [email protected]
>>> >> >
>>> >> >> >> wrote:
>>> >> >> >> >> >
>>> >> >> >> >> >> the schema depends on the data per window.
>>> >> >> >> >> >> when i added the global window it works, but then i loose
>>> the
>>> >> >> >> >> >> performance, since the secound stage of writing will begin
>>> only
>>> >> >> after
>>> >> >> >> >> >> the side input has read all the data and updated the schema
>>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
>>> >> that
>>> >> >> i
>>> >> >> >> >> >> don't know why?
>>> >> >> >> >> >>
>>> >> >> >> >> >> chaim
>>> >> >> >> >> >>
>>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>>> >> >> >> >> >> <[email protected]> wrote:
>>> >> >> >> >> >> > Are your schemas actually supposed to be different between
>>> >> >> >> different
>>> >> >> >> >> >> > windows, or do they depend only on data?
>>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>>> >> GlobalWindows())
>>> >> >> for
>>> >> >> >> >> your
>>> >> >> >> >> >> > side input - did that work when it wasn't commented out?
>>> >> >> >> >> >> >
>>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>>> >> [email protected]>
>>> >> >> >> wrote:
>>> >> >> >> >> >> >
>>> >> >> >> >> >> >> my code is:
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     //read docs from mongo
>>> >> >> >> >> >> >>                     final PCollection<Document> docs =
>>> >> pipeline
>>> >> >> >> >> >> >>                             .apply(table.getTableName(),
>>> >> >> >> >> >> MongoDbIO.read()
>>> >> >> >> >> >> >>
>>>  .withUri("mongodb://" +
>>> >> >> >> >> >> >> connectionParams)
>>> >> >> >> >> >> >>                                     .withFilter(filter)
>>> >> >> >> >> >> >>
>>>  .withDatabase(options.
>>> >> >> >> >> getDBName())
>>> >> >> >> >> >> >>
>>>  .withCollection(table.
>>> >> >> >> >> >> getTableName()))
>>> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
>>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>>> >> >> >> >> >> >>                             .apply("Window Daily",
>>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     //update bq schema based on window
>>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>>> >> String>>
>>> >> >> >> >> >> >> tableSchemas = docs
>>> >> >> >> >> >> >> //                            .apply("Global
>>> >> >> >> Window",Window.into(new
>>> >> >> >> >> >> >> GlobalWindows()))
>>> >> >> >> >> >> >>                             .apply("extract schema " +
>>> >> >> >> >> >> >> table.getTableName(), new
>>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>>> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
>>> " +
>>> >> >> >> >> >> >> table.getTableName(),
>>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>>> >> >> >> >> >> >>                             .apply(View.asMap());
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
>>> =
>>> >> docs
>>> >> >> >> >> >> >>                             .apply("doc to row " +
>>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>>> >> >> >> getBqTableName(),
>>> >> >> >> >> >> >> tableSchemas))
>>> >> >> >> >> >> >>
>>> >> >>  .withSideInputs(tableSchemas))
>>> >> >> >> ;
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     final WriteResult apply = docsRows
>>> >> >> >> >> >> >>                             .apply("insert data table -
>>> " +
>>> >> >> >> >> >> >> table.getTableName(),
>>> >> >> >> >> >> >>
>>> >>  BigQueryIO.writeTableRows()
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>>> >> >> >> CreateDisposition.CREATE_IF_
>>> >> >> >> >> >> NEEDED)
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> exception is:
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>>> <init>
>>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>>> >> >> >> d9a12e4fdcfb.
>>> >> >> >> >> >> >> Exception in thread "main"
>>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>>> side
>>> >> >> input
>>> >> >> >> >> window
>>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>>> >> >> DirectPipelineResult.
>>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>>> >> >> DirectPipelineResult.
>>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>>> >> >> >> >> >> DirectRunner.java:200)
>>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>>> >> >> >> >> >> DirectRunner.java:63)
>>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>>> >> LoadMongodbDataPipeline.
>>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>>> >> >> >> LoadMongodbDataPipeline.main(
>>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>>> Attempted to
>>> >> >> get
>>> >> >> >> side
>>> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>>> >> >> PartitioningWindowFn$1.
>>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>>> >> >> >> >> SimplePushbackSideInputDoFnRun
>>> >> >> >> >> >> ner.java:94)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>>> >> processElementInReadyWindows(
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>>> <init>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>>> >> >> >> >> >> >> <[email protected]> wrote:
>>> >> >> >> >> >> >> > Please include the full exception and please show the
>>> code
>>> >> >> that
>>> >> >> >> >> >> produces
>>> >> >> >> >> >> >> it.
>>> >> >> >> >> >> >> > See also
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>>> >> >> >> >> >> guide/#transforms-sideio
>>> >> >> >> >> >> >> > section
>>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
>>> to
>>> >> >> >> resolve
>>> >> >> >> >> your
>>> >> >> >> >> >> >> > problem.
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>>> >> >> [email protected]>
>>> >> >> >> >> wrote:
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> >> Hi,
>>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>>> >> >> updates
>>> >> >> >> the
>>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>>> want a
>>> >> >> >> >> partitioned
>>> >> >> >> >> >> >> >> table, i have a dally window.
>>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
>>> >> >> >> exception
>>> >> >> >> >> of:
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
>>> from
>>> >> >> >> >> non-global
>>> >> >> >> >> >> >> >> WindowFn"
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >> >> chaim
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >>
>>>

Reply via email to