just went over the changes for the streaming method. That looks great. How about adding the option to continue the apply after success with statistics or something like in the failure
On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <[email protected]> wrote: > Ah, so you are loading each window into a separate BigQuery table? That > might be the reason things are slow. Remembert a batch job doesn't return > until everything finishes, and if you are loading that many tables it's > entirely possible that BigQuery will throttle you, causing the slowdown. > > A couple of options: > > 1. Instead of loading into separate BigQuery tables, you could load into > separate partitions of the same table. See this page for more info: > https://cloud.google.com/bigquery/docs/partitioned-tables > > 2. If you have a streaming unbounded source for your data, you can run > using a streaming runner. That will load each window as it becomes > available instead of waiting for everything to load. > > Reuven > > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <[email protected]> wrote: > >> from what i found it I have the windowing with bigquery partition (per >> day - 1545 partitions) the insert can take 5 hours, where if there is >> no partitions then it takes about 12 minutes >> >> I have 13,843,080 recrods 6.76 GB. >> Any ideas how to get the partition to work faster. >> >> Is there a way to get the BigQueryIO to use streaming and not jobs? >> >> chaim >> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> wrote: >> > i am using windowing for the partion of the table, maybe that has to do >> with it? >> > >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected]> >> wrote: >> >> Ok, something is going wrong then. It appears that your job created over >> >> 14,000 BigQuery load jobs, which is not expected (and probably why >> things >> >> were so slow). >> >> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote: >> >> >> >>> no that specific job created only 2 tables >> >>> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]> >> >>> wrote: >> >>> > It looks like your job is creating about 14,45 distinct BigQuery >> tables. >> >>> > Does that sound correct to you? >> >>> > >> >>> > Reuven >> >>> > >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> >> wrote: >> >>> > >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752 >> >>> >> as you can see the majority of the time is inserting into bigquery. >> >>> >> is there any way to parallel this? >> >>> >> >> >>> >> My feeling for the windowing is that writing should be done per >> window >> >>> >> (my window is daily) or at least to be able to configure it >> >>> >> >> >>> >> chaim >> >>> >> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax >> <[email protected]> >> >>> >> wrote: >> >>> >> > So the problem is you are running on Dataflow, and it's taking >> longer >> >>> >> than >> >>> >> > you think it should? If you provide the Dataflow job id we can >> help >> >>> you >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns >> >>> into a >> >>> >> > Dataflow debugging session we should move it off of the Beam list >> and >> >>> >> onto >> >>> >> > a Dataflow-specific tread) >> >>> >> > >> >>> >> > Reuven >> >>> >> > >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]> >> >>> wrote: >> >>> >> > >> >>> >> >> is there a way around this, my time for 13gb is not close to 30 >> >>> >> >> minutes, while it should be around 15 minutes. >> >>> >> >> Do i need to chunk the code myself to windows, and run in >> parallel? >> >>> >> >> chaim >> >>> >> >> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax >> <[email protected] >> >>> > >> >>> >> >> wrote: >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch >> mode) >> >>> >> does >> >>> >> >> not >> >>> >> >> > produce results for a stage until it has processed that entire >> >>> stage. >> >>> >> The >> >>> >> >> > reason for this is that the batch runner is optimized for >> >>> throughput, >> >>> >> not >> >>> >> >> > latency; it wants to minimize the time for the entire job to >> >>> finish, >> >>> >> not >> >>> >> >> > the time till first output. The side input will not be >> materialized >> >>> >> until >> >>> >> >> > all of the data for all of the windows of the side input have >> been >> >>> >> >> > processed. The streaming runner on the other hand will produce >> >>> >> windows as >> >>> >> >> > they finish. So for the batch runner, there is no performance >> >>> >> advantage >> >>> >> >> you >> >>> >> >> > get for windowing the side input. >> >>> >> >> > >> >>> >> >> > The fact that BigQueryIO needs the schema side input to be >> globally >> >>> >> >> > windowed is a bit confusing and not well documented. We should >> add >> >>> >> better >> >>> >> >> > javadoc explaining this. >> >>> >> >> > >> >>> >> >> > Reuven >> >>> >> >> > >> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel < >> [email protected]> >> >>> >> wrote: >> >>> >> >> > >> >>> >> >> >> batch on dataflow >> >>> >> >> >> >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax >> >>> <[email protected] >> >>> >> > >> >>> >> >> >> wrote: >> >>> >> >> >> > Which runner are you using? And is this a batch pipeline? >> >>> >> >> >> > >> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel < >> [email protected] >> >>> > >> >>> >> >> wrote: >> >>> >> >> >> > >> >>> >> >> >> >> Thank for the answer, but i don't think that that is the >> case. >> >>> >> From >> >>> >> >> >> >> what i have seen, since i have other code to update status >> >>> based >> >>> >> on >> >>> >> >> >> >> the window, it does get called before all the windows are >> >>> >> calculated. >> >>> >> >> >> >> There is no logical reason to wait, once the window has >> >>> finished, >> >>> >> the >> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should >> start >> >>> to >> >>> >> >> write >> >>> >> >> >> >> the results. >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax >> >>> >> <[email protected] >> >>> >> >> > >> >>> >> >> >> >> wrote: >> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows, >> and >> >>> >> >> writing >> >>> >> >> >> it >> >>> >> >> >> >> > windowed would result in incorrect output. For this >> reason, >> >>> >> >> BigQueryIO >> >>> >> >> >> >> > rewindows int global windows before actually writing to >> >>> >> BigQuery. >> >>> >> >> >> >> > >> >>> >> >> >> >> > If you are running in batch mode, there is no performance >> >>> >> >> difference >> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe >> that >> >>> all >> >>> >> of >> >>> >> >> the >> >>> >> >> >> >> > batch runners wait until all windows are calculated >> before >> >>> >> >> >> materializing >> >>> >> >> >> >> > the output. >> >>> >> >> >> >> > >> >>> >> >> >> >> > Reuven >> >>> >> >> >> >> > >> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel < >> >>> [email protected] >> >>> >> > >> >>> >> >> >> wrote: >> >>> >> >> >> >> > >> >>> >> >> >> >> >> the schema depends on the data per window. >> >>> >> >> >> >> >> when i added the global window it works, but then i >> loose >> >>> the >> >>> >> >> >> >> >> performance, since the secound stage of writing will >> begin >> >>> only >> >>> >> >> after >> >>> >> >> >> >> >> the side input has read all the data and updated the >> schema >> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global >> window >> >>> >> that >> >>> >> >> i >> >>> >> >> >> >> >> don't know why? >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> chaim >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov >> >>> >> >> >> >> >> <[email protected]> wrote: >> >>> >> >> >> >> >> > Are your schemas actually supposed to be different >> between >> >>> >> >> >> different >> >>> >> >> >> >> >> > windows, or do they depend only on data? >> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new >> >>> >> GlobalWindows()) >> >>> >> >> for >> >>> >> >> >> >> your >> >>> >> >> >> >> >> > side input - did that work when it wasn't commented >> out? >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel < >> >>> >> [email protected]> >> >>> >> >> >> wrote: >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> >> my code is: >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> //read docs from mongo >> >>> >> >> >> >> >> >> final PCollection<Document> docs >> = >> >>> >> pipeline >> >>> >> >> >> >> >> >> >> .apply(table.getTableName(), >> >>> >> >> >> >> >> MongoDbIO.read() >> >>> >> >> >> >> >> >> >> >>> .withUri("mongodb://" + >> >>> >> >> >> >> >> >> connectionParams) >> >>> >> >> >> >> >> >> >> .withFilter(filter) >> >>> >> >> >> >> >> >> >> >>> .withDatabase(options. >> >>> >> >> >> >> getDBName()) >> >>> >> >> >> >> >> >> >> >>> .withCollection(table. >> >>> >> >> >> >> >> getTableName())) >> >>> >> >> >> >> >> >> >> .apply("AddEventTimestamps", >> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new >> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) >> >>> >> >> >> >> >> >> .apply("Window Daily", >> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1))); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> //update bq schema based on >> window >> >>> >> >> >> >> >> >> final PCollectionView<Map<String, >> >>> >> String>> >> >>> >> >> >> >> >> >> tableSchemas = docs >> >>> >> >> >> >> >> >> // .apply("Global >> >>> >> >> >> Window",Window.into(new >> >>> >> >> >> >> >> >> GlobalWindows())) >> >>> >> >> >> >> >> >> .apply("extract schema " >> + >> >>> >> >> >> >> >> >> table.getTableName(), new >> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline. >> DocsToSchemaTransform(table)) >> >>> >> >> >> >> >> >> >> .apply("getTableSchemaMemory >> >>> " + >> >>> >> >> >> >> >> >> table.getTableName(), >> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory( >> table.getTableName()))) >> >>> >> >> >> >> >> >> .apply(View.asMap()); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> final PCollection<TableRow> >> docsRows >> >>> = >> >>> >> docs >> >>> >> >> >> >> >> >> .apply("doc to row " + >> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table. >> >>> >> >> >> getBqTableName(), >> >>> >> >> >> >> >> >> tableSchemas)) >> >>> >> >> >> >> >> >> >> >>> >> >> .withSideInputs(tableSchemas)) >> >>> >> >> >> ; >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> final WriteResult apply = >> docsRows >> >>> >> >> >> >> >> >> .apply("insert data >> table - >> >>> " + >> >>> >> >> >> >> >> >> table.getTableName(), >> >>> >> >> >> >> >> >> >> >>> >> BigQueryIO.writeTableRows() >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), >> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName())) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write. >> >>> >> >> >> CreateDisposition.CREATE_IF_ >> >>> >> >> >> >> >> NEEDED) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> exception is: >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >> >>> <init> >> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ >> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20 >> 0d/cb3f0aef-9aeb-47ac-93dc- >> >>> >> >> >> d9a12e4fdcfb. >> >>> >> >> >> >> >> >> Exception in thread "main" >> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$ >> PipelineExecutionException: >> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get >> >>> side >> >>> >> >> input >> >>> >> >> >> >> window >> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >> >>> >> >> DirectPipelineResult. >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >> >>> >> >> DirectPipelineResult. >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301) >> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >>> >> >> >> >> >> DirectRunner.java:200) >> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >>> >> >> >> >> >> DirectRunner.java:63) >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. >> run(Pipeline.java:297) >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. >> run(Pipeline.java:283) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >> >>> >> LoadMongodbDataPipeline. >> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >> >>> >> >> >> LoadMongodbDataPipeline.main( >> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372) >> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: >> >>> Attempted to >> >>> >> >> get >> >>> >> >> >> side >> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global >> WindowFn >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing. >> >>> >> >> PartitioningWindowFn$1. >> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners. >> core. >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady( >> >>> >> >> >> >> SimplePushbackSideInputDoFnRun >> >>> >> >> >> >> >> ner.java:94) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners. >> core. >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner. >> >>> >> processElementInReadyWindows( >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76) >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >> >>> <init> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov >> >>> >> >> >> >> >> >> <[email protected]> wrote: >> >>> >> >> >> >> >> >> > Please include the full exception and please show >> the >> >>> code >> >>> >> >> that >> >>> >> >> >> >> >> produces >> >>> >> >> >> >> >> >> it. >> >>> >> >> >> >> >> >> > See also >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming- >> >>> >> >> >> >> >> guide/#transforms-sideio >> >>> >> >> >> >> >> >> > section >> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be >> sufficient >> >>> to >> >>> >> >> >> resolve >> >>> >> >> >> >> your >> >>> >> >> >> >> >> >> > problem. >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel < >> >>> >> >> [email protected]> >> >>> >> >> >> >> wrote: >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> >> Hi, >> >>> >> >> >> >> >> >> >> I have a pipline that bases on documents from >> mongo >> >>> >> >> updates >> >>> >> >> >> the >> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i >> >>> want a >> >>> >> >> >> >> partitioned >> >>> >> >> >> >> >> >> >> table, i have a dally window. >> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i >> get the >> >>> >> >> >> exception >> >>> >> >> >> >> of: >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> Attempted to get side input window for >> GlobalWindow >> >>> from >> >>> >> >> >> >> non-global >> >>> >> >> >> >> >> >> >> WindowFn" >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> chaim >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >>> >>
