Can you show us some of the code you are using? How are you loading into separate partitions?
Reuven On Wed, Sep 13, 2017 at 10:13 AM, Chaim Turkel <[email protected]> wrote: > I am loading into separate partitions of the same table. > I want to see it streaming will be faster. > > Is there a repository where i can use the snapshot version? > > > On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <[email protected]> > wrote: > > Ah, so you are loading each window into a separate BigQuery table? That > > might be the reason things are slow. Remembert a batch job doesn't return > > until everything finishes, and if you are loading that many tables it's > > entirely possible that BigQuery will throttle you, causing the slowdown. > > > > A couple of options: > > > > 1. Instead of loading into separate BigQuery tables, you could load into > > separate partitions of the same table. See this page for more info: > > https://cloud.google.com/bigquery/docs/partitioned-tables > > > > 2. If you have a streaming unbounded source for your data, you can run > > using a streaming runner. That will load each window as it becomes > > available instead of waiting for everything to load. > > > > Reuven > > > > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <[email protected]> wrote: > > > >> from what i found it I have the windowing with bigquery partition (per > >> day - 1545 partitions) the insert can take 5 hours, where if there is > >> no partitions then it takes about 12 minutes > >> > >> I have 13,843,080 recrods 6.76 GB. > >> Any ideas how to get the partition to work faster. > >> > >> Is there a way to get the BigQueryIO to use streaming and not jobs? > >> > >> chaim > >> > >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> > wrote: > >> > i am using windowing for the partion of the table, maybe that has to > do > >> with it? > >> > > >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected] > > > >> wrote: > >> >> Ok, something is going wrong then. It appears that your job created > over > >> >> 14,000 BigQuery load jobs, which is not expected (and probably why > >> things > >> >> were so slow). > >> >> > >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> > wrote: > >> >> > >> >>> no that specific job created only 2 tables > >> >>> > >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax > <[email protected]> > >> >>> wrote: > >> >>> > It looks like your job is creating about 14,45 distinct BigQuery > >> tables. > >> >>> > Does that sound correct to you? > >> >>> > > >> >>> > Reuven > >> >>> > > >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> > >> wrote: > >> >>> > > >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752 > >> >>> >> as you can see the majority of the time is inserting into > bigquery. > >> >>> >> is there any way to parallel this? > >> >>> >> > >> >>> >> My feeling for the windowing is that writing should be done per > >> window > >> >>> >> (my window is daily) or at least to be able to configure it > >> >>> >> > >> >>> >> chaim > >> >>> >> > >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax > >> <[email protected]> > >> >>> >> wrote: > >> >>> >> > So the problem is you are running on Dataflow, and it's taking > >> longer > >> >>> >> than > >> >>> >> > you think it should? If you provide the Dataflow job id we can > >> help > >> >>> you > >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this > turns > >> >>> into a > >> >>> >> > Dataflow debugging session we should move it off of the Beam > list > >> and > >> >>> >> onto > >> >>> >> > a Dataflow-specific tread) > >> >>> >> > > >> >>> >> > Reuven > >> >>> >> > > >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel < > [email protected]> > >> >>> wrote: > >> >>> >> > > >> >>> >> >> is there a way around this, my time for 13gb is not close to > 30 > >> >>> >> >> minutes, while it should be around 15 minutes. > >> >>> >> >> Do i need to chunk the code myself to windows, and run in > >> parallel? > >> >>> >> >> chaim > >> >>> >> >> > >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax > >> <[email protected] > >> >>> > > >> >>> >> >> wrote: > >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch > >> mode) > >> >>> >> does > >> >>> >> >> not > >> >>> >> >> > produce results for a stage until it has processed that > entire > >> >>> stage. > >> >>> >> The > >> >>> >> >> > reason for this is that the batch runner is optimized for > >> >>> throughput, > >> >>> >> not > >> >>> >> >> > latency; it wants to minimize the time for the entire job to > >> >>> finish, > >> >>> >> not > >> >>> >> >> > the time till first output. The side input will not be > >> materialized > >> >>> >> until > >> >>> >> >> > all of the data for all of the windows of the side input > have > >> been > >> >>> >> >> > processed. The streaming runner on the other hand will > produce > >> >>> >> windows as > >> >>> >> >> > they finish. So for the batch runner, there is no > performance > >> >>> >> advantage > >> >>> >> >> you > >> >>> >> >> > get for windowing the side input. > >> >>> >> >> > > >> >>> >> >> > The fact that BigQueryIO needs the schema side input to be > >> globally > >> >>> >> >> > windowed is a bit confusing and not well documented. We > should > >> add > >> >>> >> better > >> >>> >> >> > javadoc explaining this. > >> >>> >> >> > > >> >>> >> >> > Reuven > >> >>> >> >> > > >> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel < > >> [email protected]> > >> >>> >> wrote: > >> >>> >> >> > > >> >>> >> >> >> batch on dataflow > >> >>> >> >> >> > >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax > >> >>> <[email protected] > >> >>> >> > > >> >>> >> >> >> wrote: > >> >>> >> >> >> > Which runner are you using? And is this a batch pipeline? > >> >>> >> >> >> > > >> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel < > >> [email protected] > >> >>> > > >> >>> >> >> wrote: > >> >>> >> >> >> > > >> >>> >> >> >> >> Thank for the answer, but i don't think that that is the > >> case. > >> >>> >> From > >> >>> >> >> >> >> what i have seen, since i have other code to update > status > >> >>> based > >> >>> >> on > >> >>> >> >> >> >> the window, it does get called before all the windows > are > >> >>> >> calculated. > >> >>> >> >> >> >> There is no logical reason to wait, once the window has > >> >>> finished, > >> >>> >> the > >> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should > >> start > >> >>> to > >> >>> >> >> write > >> >>> >> >> >> >> the results. > >> >>> >> >> >> >> > >> >>> >> >> >> >> > >> >>> >> >> >> >> > >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax > >> >>> >> <[email protected] > >> >>> >> >> > > >> >>> >> >> >> >> wrote: > >> >>> >> >> >> >> > Logically the BigQuery write does not depend on > windows, > >> and > >> >>> >> >> writing > >> >>> >> >> >> it > >> >>> >> >> >> >> > windowed would result in incorrect output. For this > >> reason, > >> >>> >> >> BigQueryIO > >> >>> >> >> >> >> > rewindows int global windows before actually writing > to > >> >>> >> BigQuery. > >> >>> >> >> >> >> > > >> >>> >> >> >> >> > If you are running in batch mode, there is no > performance > >> >>> >> >> difference > >> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe > >> that > >> >>> all > >> >>> >> of > >> >>> >> >> the > >> >>> >> >> >> >> > batch runners wait until all windows are calculated > >> before > >> >>> >> >> >> materializing > >> >>> >> >> >> >> > the output. > >> >>> >> >> >> >> > > >> >>> >> >> >> >> > Reuven > >> >>> >> >> >> >> > > >> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel < > >> >>> [email protected] > >> >>> >> > > >> >>> >> >> >> wrote: > >> >>> >> >> >> >> > > >> >>> >> >> >> >> >> the schema depends on the data per window. > >> >>> >> >> >> >> >> when i added the global window it works, but then i > >> loose > >> >>> the > >> >>> >> >> >> >> >> performance, since the secound stage of writing will > >> begin > >> >>> only > >> >>> >> >> after > >> >>> >> >> >> >> >> the side input has read all the data and updated the > >> schema > >> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global > >> window > >> >>> >> that > >> >>> >> >> i > >> >>> >> >> >> >> >> don't know why? > >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> chaim > >> >>> >> >> >> >> >> > >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov > >> >>> >> >> >> >> >> <[email protected]> wrote: > >> >>> >> >> >> >> >> > Are your schemas actually supposed to be different > >> between > >> >>> >> >> >> different > >> >>> >> >> >> >> >> > windows, or do they depend only on data? > >> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new > >> >>> >> GlobalWindows()) > >> >>> >> >> for > >> >>> >> >> >> >> your > >> >>> >> >> >> >> >> > side input - did that work when it wasn't commented > >> out? > >> >>> >> >> >> >> >> > > >> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel < > >> >>> >> [email protected]> > >> >>> >> >> >> wrote: > >> >>> >> >> >> >> >> > > >> >>> >> >> >> >> >> >> my code is: > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> //read docs from mongo > >> >>> >> >> >> >> >> >> final PCollection<Document> > docs > >> = > >> >>> >> pipeline > >> >>> >> >> >> >> >> >> > >> .apply(table.getTableName(), > >> >>> >> >> >> >> >> MongoDbIO.read() > >> >>> >> >> >> >> >> >> > >> >>> .withUri("mongodb://" + > >> >>> >> >> >> >> >> >> connectionParams) > >> >>> >> >> >> >> >> >> > >> .withFilter(filter) > >> >>> >> >> >> >> >> >> > >> >>> .withDatabase(options. > >> >>> >> >> >> >> getDBName()) > >> >>> >> >> >> >> >> >> > >> >>> .withCollection(table. > >> >>> >> >> >> >> >> getTableName())) > >> >>> >> >> >> >> >> >> > >> .apply("AddEventTimestamps", > >> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new > >> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) > >> >>> >> >> >> >> >> >> .apply("Window Daily", > >> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1))); > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> //update bq schema based on > >> window > >> >>> >> >> >> >> >> >> final > PCollectionView<Map<String, > >> >>> >> String>> > >> >>> >> >> >> >> >> >> tableSchemas = docs > >> >>> >> >> >> >> >> >> // .apply("Global > >> >>> >> >> >> Window",Window.into(new > >> >>> >> >> >> >> >> >> GlobalWindows())) > >> >>> >> >> >> >> >> >> .apply("extract > schema " > >> + > >> >>> >> >> >> >> >> >> table.getTableName(), new > >> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline. > >> DocsToSchemaTransform(table)) > >> >>> >> >> >> >> >> >> > >> .apply("getTableSchemaMemory > >> >>> " + > >> >>> >> >> >> >> >> >> table.getTableName(), > >> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory( > >> table.getTableName()))) > >> >>> >> >> >> >> >> >> .apply(View.asMap()); > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> final PCollection<TableRow> > >> docsRows > >> >>> = > >> >>> >> docs > >> >>> >> >> >> >> >> >> .apply("doc to row " + > >> >>> >> >> >> >> >> >> table.getTableName(), > ParDo.of(docToTableRow(table. > >> >>> >> >> >> getBqTableName(), > >> >>> >> >> >> >> >> >> tableSchemas)) > >> >>> >> >> >> >> >> >> > >> >>> >> >> .withSideInputs(tableSchemas)) > >> >>> >> >> >> ; > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> final WriteResult apply = > >> docsRows > >> >>> >> >> >> >> >> >> .apply("insert data > >> table - > >> >>> " + > >> >>> >> >> >> >> >> >> table.getTableName(), > >> >>> >> >> >> >> >> >> > >> >>> >> BigQueryIO.writeTableRows() > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay( > options.getBQProject(), > >> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName())) > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas) > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write. > >> >>> >> >> >> CreateDisposition.CREATE_IF_ > >> >>> >> >> >> >> >> NEEDED) > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> exception is: > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM > >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp. > bigquery.TableRowWriter > >> >>> <init> > >> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ > >> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20 > >> 0d/cb3f0aef-9aeb-47ac-93dc- > >> >>> >> >> >> d9a12e4fdcfb. > >> >>> >> >> >> >> >> >> Exception in thread "main" > >> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$ > >> PipelineExecutionException: > >> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to > get > >> >>> side > >> >>> >> >> input > >> >>> >> >> >> >> window > >> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ > >> >>> >> >> DirectPipelineResult. > >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331) > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ > >> >>> >> >> DirectPipelineResult. > >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301) > >> >>> >> >> >> >> >> >> at org.apache.beam.runners. > direct.DirectRunner.run( > >> >>> >> >> >> >> >> DirectRunner.java:200) > >> >>> >> >> >> >> >> >> at org.apache.beam.runners. > direct.DirectRunner.run( > >> >>> >> >> >> >> >> DirectRunner.java:63) > >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. > >> run(Pipeline.java:297) > >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline. > >> run(Pipeline.java:283) > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. > >> >>> >> LoadMongodbDataPipeline. > >> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347) > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. > >> >>> >> >> >> LoadMongodbDataPipeline.main( > >> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372) > >> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: > >> >>> Attempted to > >> >>> >> >> get > >> >>> >> >> >> side > >> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global > >> WindowFn > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing. > >> >>> >> >> PartitioningWindowFn$1. > >> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49) > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> org.apache.beam.runners. > direct.repackaged.runners. > >> core. > >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady( > >> >>> >> >> >> >> SimplePushbackSideInputDoFnRun > >> >>> >> >> >> >> >> ner.java:94) > >> >>> >> >> >> >> >> >> at > >> >>> >> >> >> >> >> >> org.apache.beam.runners. > direct.repackaged.runners. > >> core. > >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner. > >> >>> >> processElementInReadyWindows( > >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76) > >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM > >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp. > bigquery.TableRowWriter > >> >>> <init> > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov > >> >>> >> >> >> >> >> >> <[email protected]> wrote: > >> >>> >> >> >> >> >> >> > Please include the full exception and please > show > >> the > >> >>> code > >> >>> >> >> that > >> >>> >> >> >> >> >> produces > >> >>> >> >> >> >> >> >> it. > >> >>> >> >> >> >> >> >> > See also > >> >>> >> >> >> >> >> >> > > >> >>> >> >> >> >> >> >> https://beam.apache.org/ > documentation/programming- > >> >>> >> >> >> >> >> guide/#transforms-sideio > >> >>> >> >> >> >> >> >> > section > >> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be > >> sufficient > >> >>> to > >> >>> >> >> >> resolve > >> >>> >> >> >> >> your > >> >>> >> >> >> >> >> >> > problem. > >> >>> >> >> >> >> >> >> > > >> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel < > >> >>> >> >> [email protected]> > >> >>> >> >> >> >> wrote: > >> >>> >> >> >> >> >> >> > > >> >>> >> >> >> >> >> >> >> Hi, > >> >>> >> >> >> >> >> >> >> I have a pipline that bases on documents from > >> mongo > >> >>> >> >> updates > >> >>> >> >> >> the > >> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. > Since i > >> >>> want a > >> >>> >> >> >> >> partitioned > >> >>> >> >> >> >> >> >> >> table, i have a dally window. > >> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i > >> get the > >> >>> >> >> >> exception > >> >>> >> >> >> >> of: > >> >>> >> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> >> Attempted to get side input window for > >> GlobalWindow > >> >>> from > >> >>> >> >> >> >> non-global > >> >>> >> >> >> >> >> >> >> WindowFn" > >> >>> >> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> >> chaim > >> >>> >> >> >> >> >> >> >> > >> >>> >> >> >> >> >> >> > >> >>> >> >> >> >> >> > >> >>> >> >> >> >> > >> >>> >> >> >> > >> >>> >> >> > >> >>> >> > >> >>> > >> >
