I am loading into separate partitions of the same table. I want to see it streaming will be faster.
Is there a repository where i can use the snapshot version? On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <[email protected]> wrote: > Ah, so you are loading each window into a separate BigQuery table? That > might be the reason things are slow. Remembert a batch job doesn't return > until everything finishes, and if you are loading that many tables it's > entirely possible that BigQuery will throttle you, causing the slowdown. > > A couple of options: > > 1. Instead of loading into separate BigQuery tables, you could load into > separate partitions of the same table. See this page for more info: > https://cloud.google.com/bigquery/docs/partitioned-tables > > 2. If you have a streaming unbounded source for your data, you can run > using a streaming runner. That will load each window as it becomes > available instead of waiting for everything to load. > > Reuven > > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <[email protected]> wrote: > >> from what i found it I have the windowing with bigquery partition (per >> day - 1545 partitions) the insert can take 5 hours, where if there is >> no partitions then it takes about 12 minutes >> >> I have 13,843,080 recrods 6.76 GB. >> Any ideas how to get the partition to work faster. >> >> Is there a way to get the BigQueryIO to use streaming and not jobs? >> >> chaim >> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> wrote: >> > i am using windowing for the partion of the table, maybe that has to do >> with it? >> > >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected]> >> wrote: >> >> Ok, something is going wrong then. It appears that your job created over >> >> 14,000 BigQuery load jobs, which is not expected (and probably why >> things >> >> were so slow). >> >> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote: >> >> >> >>> no that specific job created only 2 tables >> >>> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]> >> >>> wrote: >> >>> > It looks like your job is creating about 14,45 distinct BigQuery >> tables. >> >>> > Does that sound correct to you? >> >>> > >> >>> > Reuven >> >>> > >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> >> wrote: >> >>> > >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752 >> >>> >> as you can see the majority of the time is inserting into bigquery. >> >>> >> is there any way to parallel this? >> >>> >> >> >>> >> My feeling for the windowing is that writing should be done per >> window >> >>> >> (my window is daily) or at least to be able to configure it >> >>> >> >> >>> >> chaim >> >>> >> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax >> <[email protected]> >> >>> >> wrote: >> >>> >> > So the problem is you are running on Dataflow, and it's taking >> longer >> >>> >> than >> >>> >> > you think it should? If you provide the Dataflow job id we can >> help >> >>> you >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns >> >>> into a >> >>> >> > Dataflow debugging session we should move it off of the Beam list >> and >> >>> >> onto >> >>> >> > a Dataflow-specific tread) >> >>> >> > >> >>> >> > Reuven >> >>> >> > >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]> >> >>> wrote: >> >>> >> > >> >>> >> >> is there a way around this, my time for 13gb is not close to 30 >> >>> >> >> minutes, while it should be around 15 minutes. >> >>> >> >> Do i need to chunk the code myself to windows, and run in >> parallel? >> >>> >> >> chaim >> >>> >> >> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax >> <[email protected] >> >>> > >> >>> >> >> wrote: >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch >> mode) >> >>> >> does >> >>> >> >> not >> >>> >> >> > produce results for a stage until it has processed that entire >> >>> stage. >> >>> >> The >> >>> >> >> > reason for this is that the batch runner is optimized for >> >>> throughput, >> >>> >> not >> >>> >> >> > latency; it wants to minimize the time for the entire job to >> >>> finish, >> >>> >> not >> >>> >> >> > the time till first output. The side input will not be >> materialized >> >>> >> until >> >>> >> >> > all of the data for all of the windows of the side input have >> been >> >>> >> >> > processed. The streaming runner on the other hand will produce >> >>> >> windows as >> >>> >> >> > they finish. So for the batch runner, there is no performance >> >>> >> advantage >> >>> >> >> you >> >>> >> >> > get for windowing the side input. >> >>> >> >> > >> >>> >> >> > The fact that BigQueryIO needs the schema side input to be >> globally >> >>> >> >> > windowed is a bit confusing and not well documented. We should >> add >> >>> >> better >> >>> >> >> > javadoc explaining this. >> >>> >> >> > >> >>> >> >> > Reuven >> >>> >> >> > >> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel < >> [email protected]> >> >>> >> wrote: >> >>> >> >> > >> >>> >> >> >> batch on dataflow >> >>> >> >> >> >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax >> >>> <[email protected] >> >>> >> > >> >>> >> >> >> wrote: >> >>> >> >> >> > Which runner are you using? And is this a batch pipeline? >> >>> >> >> >> > >> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel < >> [email protected] >> >>> > >> >>> >> >> wrote: >> >>> >> >> >> > >> >>> >> >> >> >> Thank for the answer, but i don't think that that is the >> case. >> >>> >> From >> >>> >> >> >> >> what i have seen, since i have other code to update status >> >>> based >> >>> >> on >> >>> >> >> >> >> the window, it does get called before all the windows are >> >>> >> calculated. >> >>> >> >> >> >> There is no logical reason to wait, once the window has >> >>> finished, >> >>> >> the >> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should >> start >> >>> to >> >>> >> >> write >> >>> >> >> >> >> the results. >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax >> >>> >> <[email protected] >> >>> >> >> > >> >>> >> >> >> >> wrote: >> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows, >> and >> >>> >> >> writing >> >>> >> >> >> it >> >>> >> >> >> >> > windowed would result in incorrect output. For this >> reason, >> >>> >> >> BigQueryIO >> >>> >> >> >> >> > rewindows int global windows before actually writing to >> >>> >> BigQuery. >> >>> >> >> >> >> > >> >>> >> >> >> >> > If you are running in batch mode, there is no performance >> >>> >> >> difference >> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe >> that >> >>> all >> >>> >> of >> >>> >> >> the >> >>> >> >> >> >> > batch runners wait until all windows are calculated >> before >> >>> >> >> >> materializing >> >>> >> >> >> >> > the output. >> >>> >> >> >> >> > >> >>> >> >> >> >> > Reuven >> >>> >> >> >> >> > >> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel < >> >>> [email protected] >> >>> >> > >> >>> >> >> >> wrote: >> >>> >> >> >> >> > >> >>> >> >> >> >> >> the schema depends on the data per window. >> >>> >> >> >> >> >> when i added the global window it works, but then i >> loose >> >>> the >> >>> >> >> >> >> >> performance, since the secound stage of writing will >> begin >> >>> only >> >>> >> >> after >> >>> >> >> >> >> >> the side input has read all the data and updated the >> schema >> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global >> window >> >>> >> that >> >>> >> >> i >> >>> >> >> >> >> >> don't know why? >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> chaim >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov >> >>> >> >> >> >> >> <[email protected]> wrote: >> >>> >> >> >> >> >> > Are your schemas actually supposed to be different >> between >> >>> >> >> >> different >> >>> >> >> >> >> >> > windows, or do they depend only on data? >> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new >> >>> >> GlobalWindows()) >> >>> >> >> for >> >>> >> >> >> >> your >> >>> >> >> >> >> >> > side input - did that work when it wasn't commented >> out? >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel < >> >>> >> [email protected]> >> >>> >> >> >> wrote: >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> >> my code is: >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> //read docs from mongo >> >>> >> >> >> >> >> >> final PCollection<Document> docs >> = >> >>> >> pipeline >> >>> >> >> >> >> >> >> >> .apply(table.getTableName(), >> >>> >> >> >> >> >> MongoDbIO.read() >> >>> >> >> >> >> >> >> >> >>> .withUri("mongodb://" + >> >>> >> >> >> >> >> >> connectionParams) >> >>> >> >> >> >> >> >> >> .withFilter(filter) >> >>> >> >> >> >> >> >> >> >>> .withDatabase(options. >> >>> >> >> >> >> getDBName()) >> >>> >> >> >> >> >> >> >> >>> .withCollection(table. >> >>> >> >> >> >> >> getTableName())) >> >>> >> >> >> >> >> >> >> .apply("AddEventTimestamps", >> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new >> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) >> >>> >> >> >> >> >> >> .apply("Window Daily", >> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1))); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> //update bq schema based on >> window >> >>> >> >> >> >> >> >> final PCollectionView<Map<String, >> >>> >> String>> >> >>> >> >> >> >> >> >> tableSchemas = docs >> >>> >> >> >> >> >> >> // .apply("Global >> >>> >> >> >> Window",Window.into(new >> >>> >> >> >> >> >> >> GlobalWindows())) >> >>> >> >> >> >> >> >> .apply("extract schema " >> + >> >>> >> >> >> >> >> >> table.getTableName(), new >> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline. >> DocsToSchemaTransform(table)) >> >>> >> >> >> >> >> >> >> .apply("getTableSchemaMemory >> >>> " + >> >>> >> >> >> >> >> >> table.getTableName(), >> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory( >> table.getTableName()))) >> >>> >> >> >> >> >> >> .apply(View.asMap()); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> final PCollection<TableRow> >> docsRows >> >>> = >> >>> >> docs >> >>> >> >> >> >> >> >> .apply("doc to row " + >> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table. >> >>> >> >> >> getBqTableName(), >> >>> >> >> >> >> >> >> tableSchemas)) >> >>> >> >> >> >> >> >> >> >>> >> >> .withSideInputs(tableSchemas)) >> >>> >> >> >> ; >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> final WriteResult apply = >> docsRows >> >>> >> >> >> >> >> >> .apply("insert data >> table - >> >>> " + >> >>> >> >> >> >> >> >> table.getTableName(), >> >>> >> >> >> >> >> >> >> >>> >> BigQueryIO.writeTableRows() >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), >> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName())) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write. >> >>> >> >> >> CreateDisposition.CREATE_IF_ >> >>> >> >> >> >> >> NEEDED) >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> exception is: >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >> >>> <init> >> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ >> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20 >> 0d/cb3f0aef-9aeb-47ac-93dc- >> >>> >> >> >> d9a12e4fdcfb. >> >>> >> >> >> >> >> >> Exception in thread "main" >> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$ >> PipelineExecutionException: >> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get >> >>> side >> >>> >> >> input >> >>> >> >> >> >> window >> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >> >>> >> >> DirectPipelineResult. >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >> >>> >> >> DirectPipelineResult. >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301) >> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >>> >> >> >> >> >> DirectRunner.java:200) >> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >>> >> >> >> >> >> DirectRunner.java:63) >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. >> run(Pipeline.java:297) >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. >> run(Pipeline.java:283) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >> >>> >> LoadMongodbDataPipeline. >> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >> >>> >> >> >> LoadMongodbDataPipeline.main( >> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372) >> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: >> >>> Attempted to >> >>> >> >> get >> >>> >> >> >> side >> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global >> WindowFn >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing. >> >>> >> >> PartitioningWindowFn$1. >> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners. >> core. >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady( >> >>> >> >> >> >> SimplePushbackSideInputDoFnRun >> >>> >> >> >> >> >> ner.java:94) >> >>> >> >> >> >> >> >> at >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners. >> core. >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner. >> >>> >> processElementInReadyWindows( >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76) >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >> >>> <init> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov >> >>> >> >> >> >> >> >> <[email protected]> wrote: >> >>> >> >> >> >> >> >> > Please include the full exception and please show >> the >> >>> code >> >>> >> >> that >> >>> >> >> >> >> >> produces >> >>> >> >> >> >> >> >> it. >> >>> >> >> >> >> >> >> > See also >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming- >> >>> >> >> >> >> >> guide/#transforms-sideio >> >>> >> >> >> >> >> >> > section >> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be >> sufficient >> >>> to >> >>> >> >> >> resolve >> >>> >> >> >> >> your >> >>> >> >> >> >> >> >> > problem. >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel < >> >>> >> >> [email protected]> >> >>> >> >> >> >> wrote: >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> >> Hi, >> >>> >> >> >> >> >> >> >> I have a pipline that bases on documents from >> mongo >> >>> >> >> updates >> >>> >> >> >> the >> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i >> >>> want a >> >>> >> >> >> >> partitioned >> >>> >> >> >> >> >> >> >> table, i have a dally window. >> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i >> get the >> >>> >> >> >> exception >> >>> >> >> >> >> of: >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> Attempted to get side input window for >> GlobalWindow >> >>> from >> >>> >> >> >> >> non-global >> >>> >> >> >> >> >> >> >> WindowFn" >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> chaim >> >>> >> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >>> >>
