from what i found it I have the windowing with bigquery partition (per day - 1545 partitions) the insert can take 5 hours, where if there is no partitions then it takes about 12 minutes
I have 13,843,080 recrods 6.76 GB. Any ideas how to get the partition to work faster. Is there a way to get the BigQueryIO to use streaming and not jobs? chaim On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <[email protected]> wrote: > i am using windowing for the partion of the table, maybe that has to do with > it? > > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <[email protected]> wrote: >> Ok, something is going wrong then. It appears that your job created over >> 14,000 BigQuery load jobs, which is not expected (and probably why things >> were so slow). >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote: >> >>> no that specific job created only 2 tables >>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]> >>> wrote: >>> > It looks like your job is creating about 14,45 distinct BigQuery tables. >>> > Does that sound correct to you? >>> > >>> > Reuven >>> > >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> wrote: >>> > >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752 >>> >> as you can see the majority of the time is inserting into bigquery. >>> >> is there any way to parallel this? >>> >> >>> >> My feeling for the windowing is that writing should be done per window >>> >> (my window is daily) or at least to be able to configure it >>> >> >>> >> chaim >>> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <[email protected]> >>> >> wrote: >>> >> > So the problem is you are running on Dataflow, and it's taking longer >>> >> than >>> >> > you think it should? If you provide the Dataflow job id we can help >>> you >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns >>> into a >>> >> > Dataflow debugging session we should move it off of the Beam list and >>> >> onto >>> >> > a Dataflow-specific tread) >>> >> > >>> >> > Reuven >>> >> > >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]> >>> wrote: >>> >> > >>> >> >> is there a way around this, my time for 13gb is not close to 30 >>> >> >> minutes, while it should be around 15 minutes. >>> >> >> Do i need to chunk the code myself to windows, and run in parallel? >>> >> >> chaim >>> >> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <[email protected] >>> > >>> >> >> wrote: >>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode) >>> >> does >>> >> >> not >>> >> >> > produce results for a stage until it has processed that entire >>> stage. >>> >> The >>> >> >> > reason for this is that the batch runner is optimized for >>> throughput, >>> >> not >>> >> >> > latency; it wants to minimize the time for the entire job to >>> finish, >>> >> not >>> >> >> > the time till first output. The side input will not be materialized >>> >> until >>> >> >> > all of the data for all of the windows of the side input have been >>> >> >> > processed. The streaming runner on the other hand will produce >>> >> windows as >>> >> >> > they finish. So for the batch runner, there is no performance >>> >> advantage >>> >> >> you >>> >> >> > get for windowing the side input. >>> >> >> > >>> >> >> > The fact that BigQueryIO needs the schema side input to be globally >>> >> >> > windowed is a bit confusing and not well documented. We should add >>> >> better >>> >> >> > javadoc explaining this. >>> >> >> > >>> >> >> > Reuven >>> >> >> > >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]> >>> >> wrote: >>> >> >> > >>> >> >> >> batch on dataflow >>> >> >> >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax >>> <[email protected] >>> >> > >>> >> >> >> wrote: >>> >> >> >> > Which runner are you using? And is this a batch pipeline? >>> >> >> >> > >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected] >>> > >>> >> >> wrote: >>> >> >> >> > >>> >> >> >> >> Thank for the answer, but i don't think that that is the case. >>> >> From >>> >> >> >> >> what i have seen, since i have other code to update status >>> based >>> >> on >>> >> >> >> >> the window, it does get called before all the windows are >>> >> calculated. >>> >> >> >> >> There is no logical reason to wait, once the window has >>> finished, >>> >> the >>> >> >> >> >> rest of the pipeline should run and the BigQuery should start >>> to >>> >> >> write >>> >> >> >> >> the results. >>> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax >>> >> <[email protected] >>> >> >> > >>> >> >> >> >> wrote: >>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and >>> >> >> writing >>> >> >> >> it >>> >> >> >> >> > windowed would result in incorrect output. For this reason, >>> >> >> BigQueryIO >>> >> >> >> >> > rewindows int global windows before actually writing to >>> >> BigQuery. >>> >> >> >> >> > >>> >> >> >> >> > If you are running in batch mode, there is no performance >>> >> >> difference >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe that >>> all >>> >> of >>> >> >> the >>> >> >> >> >> > batch runners wait until all windows are calculated before >>> >> >> >> materializing >>> >> >> >> >> > the output. >>> >> >> >> >> > >>> >> >> >> >> > Reuven >>> >> >> >> >> > >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel < >>> [email protected] >>> >> > >>> >> >> >> wrote: >>> >> >> >> >> > >>> >> >> >> >> >> the schema depends on the data per window. >>> >> >> >> >> >> when i added the global window it works, but then i loose >>> the >>> >> >> >> >> >> performance, since the secound stage of writing will begin >>> only >>> >> >> after >>> >> >> >> >> >> the side input has read all the data and updated the schema >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window >>> >> that >>> >> >> i >>> >> >> >> >> >> don't know why? >>> >> >> >> >> >> >>> >> >> >> >> >> chaim >>> >> >> >> >> >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov >>> >> >> >> >> >> <[email protected]> wrote: >>> >> >> >> >> >> > Are your schemas actually supposed to be different between >>> >> >> >> different >>> >> >> >> >> >> > windows, or do they depend only on data? >>> >> >> >> >> >> > I see you have a commented-out Window.into(new >>> >> GlobalWindows()) >>> >> >> for >>> >> >> >> >> your >>> >> >> >> >> >> > side input - did that work when it wasn't commented out? >>> >> >> >> >> >> > >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel < >>> >> [email protected]> >>> >> >> >> wrote: >>> >> >> >> >> >> > >>> >> >> >> >> >> >> my code is: >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> //read docs from mongo >>> >> >> >> >> >> >> final PCollection<Document> docs = >>> >> pipeline >>> >> >> >> >> >> >> .apply(table.getTableName(), >>> >> >> >> >> >> MongoDbIO.read() >>> >> >> >> >> >> >> >>> .withUri("mongodb://" + >>> >> >> >> >> >> >> connectionParams) >>> >> >> >> >> >> >> .withFilter(filter) >>> >> >> >> >> >> >> >>> .withDatabase(options. >>> >> >> >> >> getDBName()) >>> >> >> >> >> >> >> >>> .withCollection(table. >>> >> >> >> >> >> getTableName())) >>> >> >> >> >> >> >> .apply("AddEventTimestamps", >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) >>> >> >> >> >> >> >> .apply("Window Daily", >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1))); >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> //update bq schema based on window >>> >> >> >> >> >> >> final PCollectionView<Map<String, >>> >> String>> >>> >> >> >> >> >> >> tableSchemas = docs >>> >> >> >> >> >> >> // .apply("Global >>> >> >> >> Window",Window.into(new >>> >> >> >> >> >> >> GlobalWindows())) >>> >> >> >> >> >> >> .apply("extract schema " + >>> >> >> >> >> >> >> table.getTableName(), new >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) >>> >> >> >> >> >> >> .apply("getTableSchemaMemory >>> " + >>> >> >> >> >> >> >> table.getTableName(), >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName()))) >>> >> >> >> >> >> >> .apply(View.asMap()); >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> final PCollection<TableRow> docsRows >>> = >>> >> docs >>> >> >> >> >> >> >> .apply("doc to row " + >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table. >>> >> >> >> getBqTableName(), >>> >> >> >> >> >> >> tableSchemas)) >>> >> >> >> >> >> >> >>> >> >> .withSideInputs(tableSchemas)) >>> >> >> >> ; >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> final WriteResult apply = docsRows >>> >> >> >> >> >> >> .apply("insert data table - >>> " + >>> >> >> >> >> >> >> table.getTableName(), >>> >> >> >> >> >> >> >>> >> BigQueryIO.writeTableRows() >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName())) >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas) >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write. >>> >> >> >> CreateDisposition.CREATE_IF_ >>> >> >> >> >> >> NEEDED) >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> exception is: >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >>> <init> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc- >>> >> >> >> d9a12e4fdcfb. >>> >> >> >> >> >> >> Exception in thread "main" >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get >>> side >>> >> >> input >>> >> >> >> >> window >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >>> >> >> DirectPipelineResult. >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331) >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ >>> >> >> DirectPipelineResult. >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301) >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >>> >> >> >> >> >> DirectRunner.java:200) >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >>> >> >> >> >> >> DirectRunner.java:63) >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >>> >> LoadMongodbDataPipeline. >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347) >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb. >>> >> >> >> LoadMongodbDataPipeline.main( >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372) >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: >>> Attempted to >>> >> >> get >>> >> >> >> side >>> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing. >>> >> >> PartitioningWindowFn$1. >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49) >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady( >>> >> >> >> >> SimplePushbackSideInputDoFnRun >>> >> >> >> >> >> ner.java:94) >>> >> >> >> >> >> >> at >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner. >>> >> processElementInReadyWindows( >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76) >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter >>> <init> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov >>> >> >> >> >> >> >> <[email protected]> wrote: >>> >> >> >> >> >> >> > Please include the full exception and please show the >>> code >>> >> >> that >>> >> >> >> >> >> produces >>> >> >> >> >> >> >> it. >>> >> >> >> >> >> >> > See also >>> >> >> >> >> >> >> > >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming- >>> >> >> >> >> >> guide/#transforms-sideio >>> >> >> >> >> >> >> > section >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient >>> to >>> >> >> >> resolve >>> >> >> >> >> your >>> >> >> >> >> >> >> > problem. >>> >> >> >> >> >> >> > >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel < >>> >> >> [email protected]> >>> >> >> >> >> wrote: >>> >> >> >> >> >> >> > >>> >> >> >> >> >> >> >> Hi, >>> >> >> >> >> >> >> >> I have a pipline that bases on documents from mongo >>> >> >> updates >>> >> >> >> the >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i >>> want a >>> >> >> >> >> partitioned >>> >> >> >> >> >> >> >> table, i have a dally window. >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the >>> >> >> >> exception >>> >> >> >> >> of: >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow >>> from >>> >> >> >> >> non-global >>> >> >> >> >> >> >> >> WindowFn" >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >> chaim >>> >> >> >> >> >> >> >> >>> >> >> >> >> >> >> >>> >> >> >> >> >> >>> >> >> >> >> >>> >> >> >> >>> >> >> >>> >> >>>
