It looks like your job is creating about 14,45 distinct BigQuery tables. Does that sound correct to you?
Reuven On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> wrote: > the job id is 2017-09-12_02_57_55-5233544151932101752 > as you can see the majority of the time is inserting into bigquery. > is there any way to parallel this? > > My feeling for the windowing is that writing should be done per window > (my window is daily) or at least to be able to configure it > > chaim > > On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <[email protected]> > wrote: > > So the problem is you are running on Dataflow, and it's taking longer > than > > you think it should? If you provide the Dataflow job id we can help you > > debug why it's taking 30 minutes. (and as an aside, if this turns into a > > Dataflow debugging session we should move it off of the Beam list and > onto > > a Dataflow-specific tread) > > > > Reuven > > > > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]> wrote: > > > >> is there a way around this, my time for 13gb is not close to 30 > >> minutes, while it should be around 15 minutes. > >> Do i need to chunk the code myself to windows, and run in parallel? > >> chaim > >> > >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <[email protected]> > >> wrote: > >> > In that case I can say unequivocally that Dataflow (in batch mode) > does > >> not > >> > produce results for a stage until it has processed that entire stage. > The > >> > reason for this is that the batch runner is optimized for throughput, > not > >> > latency; it wants to minimize the time for the entire job to finish, > not > >> > the time till first output. The side input will not be materialized > until > >> > all of the data for all of the windows of the side input have been > >> > processed. The streaming runner on the other hand will produce > windows as > >> > they finish. So for the batch runner, there is no performance > advantage > >> you > >> > get for windowing the side input. > >> > > >> > The fact that BigQueryIO needs the schema side input to be globally > >> > windowed is a bit confusing and not well documented. We should add > better > >> > javadoc explaining this. > >> > > >> > Reuven > >> > > >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]> > wrote: > >> > > >> >> batch on dataflow > >> >> > >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected] > > > >> >> wrote: > >> >> > Which runner are you using? And is this a batch pipeline? > >> >> > > >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> > >> wrote: > >> >> > > >> >> >> Thank for the answer, but i don't think that that is the case. > From > >> >> >> what i have seen, since i have other code to update status based > on > >> >> >> the window, it does get called before all the windows are > calculated. > >> >> >> There is no logical reason to wait, once the window has finished, > the > >> >> >> rest of the pipeline should run and the BigQuery should start to > >> write > >> >> >> the results. > >> >> >> > >> >> >> > >> >> >> > >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax > <[email protected] > >> > > >> >> >> wrote: > >> >> >> > Logically the BigQuery write does not depend on windows, and > >> writing > >> >> it > >> >> >> > windowed would result in incorrect output. For this reason, > >> BigQueryIO > >> >> >> > rewindows int global windows before actually writing to > BigQuery. > >> >> >> > > >> >> >> > If you are running in batch mode, there is no performance > >> difference > >> >> >> > between windowed and unwindowed side inputs. I believe that all > of > >> the > >> >> >> > batch runners wait until all windows are calculated before > >> >> materializing > >> >> >> > the output. > >> >> >> > > >> >> >> > Reuven > >> >> >> > > >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected] > > > >> >> wrote: > >> >> >> > > >> >> >> >> the schema depends on the data per window. > >> >> >> >> when i added the global window it works, but then i loose the > >> >> >> >> performance, since the secound stage of writing will begin only > >> after > >> >> >> >> the side input has read all the data and updated the schema > >> >> >> >> The batchmode of the BigqueryIO seems to use a global window > that > >> i > >> >> >> >> don't know why? > >> >> >> >> > >> >> >> >> chaim > >> >> >> >> > >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov > >> >> >> >> <[email protected]> wrote: > >> >> >> >> > Are your schemas actually supposed to be different between > >> >> different > >> >> >> >> > windows, or do they depend only on data? > >> >> >> >> > I see you have a commented-out Window.into(new > GlobalWindows()) > >> for > >> >> >> your > >> >> >> >> > side input - did that work when it wasn't commented out? > >> >> >> >> > > >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel < > [email protected]> > >> >> wrote: > >> >> >> >> > > >> >> >> >> >> my code is: > >> >> >> >> >> > >> >> >> >> >> //read docs from mongo > >> >> >> >> >> final PCollection<Document> docs = > pipeline > >> >> >> >> >> .apply(table.getTableName(), > >> >> >> >> MongoDbIO.read() > >> >> >> >> >> .withUri("mongodb://" + > >> >> >> >> >> connectionParams) > >> >> >> >> >> .withFilter(filter) > >> >> >> >> >> .withDatabase(options. > >> >> >> getDBName()) > >> >> >> >> >> .withCollection(table. > >> >> >> >> getTableName())) > >> >> >> >> >> .apply("AddEventTimestamps", > >> >> >> >> >> WithTimestamps.of((Document doc) -> new > >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) > >> >> >> >> >> .apply("Window Daily", > >> >> >> >> >> Window.into(CalendarWindows.days(1))); > >> >> >> >> >> > >> >> >> >> >> //update bq schema based on window > >> >> >> >> >> final PCollectionView<Map<String, > String>> > >> >> >> >> >> tableSchemas = docs > >> >> >> >> >> // .apply("Global > >> >> Window",Window.into(new > >> >> >> >> >> GlobalWindows())) > >> >> >> >> >> .apply("extract schema " + > >> >> >> >> >> table.getTableName(), new > >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) > >> >> >> >> >> .apply("getTableSchemaMemory " + > >> >> >> >> >> table.getTableName(), > >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName()))) > >> >> >> >> >> .apply(View.asMap()); > >> >> >> >> >> > >> >> >> >> >> final PCollection<TableRow> docsRows = > docs > >> >> >> >> >> .apply("doc to row " + > >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table. > >> >> getBqTableName(), > >> >> >> >> >> tableSchemas)) > >> >> >> >> >> > >> .withSideInputs(tableSchemas)) > >> >> ; > >> >> >> >> >> > >> >> >> >> >> final WriteResult apply = docsRows > >> >> >> >> >> .apply("insert data table - " + > >> >> >> >> >> table.getTableName(), > >> >> >> >> >> > BigQueryIO.writeTableRows() > >> >> >> >> >> > >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), > >> >> >> >> >> options.getDatasetId(), table.getBqTableName())) > >> >> >> >> >> > >> >> >> >> >> .withSchemaFromView(tableSchemas) > >> >> >> >> >> > >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write. > >> >> CreateDisposition.CREATE_IF_ > >> >> >> >> NEEDED) > >> >> >> >> >> > >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); > >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> >> exception is: > >> >> >> >> >> > >> >> >> >> >> Sep 08, 2017 12:16:55 PM > >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > >> >> >> >> >> INFO: Opening TableRowWriter to > >> >> >> >> >> > >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ > >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc- > >> >> d9a12e4fdcfb. > >> >> >> >> >> Exception in thread "main" > >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get side > >> input > >> >> >> window > >> >> >> >> >> for GlobalWindow from non-global WindowFn > >> >> >> >> >> at > >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ > >> DirectPipelineResult. > >> >> >> >> waitUntilFinish(DirectRunner.java:331) > >> >> >> >> >> at > >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$ > >> DirectPipelineResult. > >> >> >> >> waitUntilFinish(DirectRunner.java:301) > >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( > >> >> >> >> DirectRunner.java:200) > >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( > >> >> >> >> DirectRunner.java:63) > >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) > >> >> >> >> >> at > >> >> >> >> >> com.behalf.migration.dataflow.mongodb. > LoadMongodbDataPipeline. > >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347) > >> >> >> >> >> at > >> >> >> >> >> com.behalf.migration.dataflow.mongodb. > >> >> LoadMongodbDataPipeline.main( > >> >> >> >> LoadMongodbDataPipeline.java:372) > >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to > >> get > >> >> side > >> >> >> >> >> input window for GlobalWindow from non-global WindowFn > >> >> >> >> >> at > >> >> >> >> >> org.apache.beam.sdk.transforms.windowing. > >> PartitioningWindowFn$1. > >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49) > >> >> >> >> >> at > >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. > >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady( > >> >> >> SimplePushbackSideInputDoFnRun > >> >> >> >> ner.java:94) > >> >> >> >> >> at > >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. > >> >> >> >> SimplePushbackSideInputDoFnRunner. > processElementInReadyWindows( > >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76) > >> >> >> >> >> Sep 08, 2017 12:16:58 PM > >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > >> >> >> >> >> > >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov > >> >> >> >> >> <[email protected]> wrote: > >> >> >> >> >> > Please include the full exception and please show the code > >> that > >> >> >> >> produces > >> >> >> >> >> it. > >> >> >> >> >> > See also > >> >> >> >> >> > > >> >> >> >> >> https://beam.apache.org/documentation/programming- > >> >> >> >> guide/#transforms-sideio > >> >> >> >> >> > section > >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient to > >> >> resolve > >> >> >> your > >> >> >> >> >> > problem. > >> >> >> >> >> > > >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel < > >> [email protected]> > >> >> >> wrote: > >> >> >> >> >> > > >> >> >> >> >> >> Hi, > >> >> >> >> >> >> I have a pipline that bases on documents from mongo > >> updates > >> >> the > >> >> >> >> >> >> schema and then adds the records to mongo. Since i want a > >> >> >> partitioned > >> >> >> >> >> >> table, i have a dally window. > >> >> >> >> >> >> How do i get the schema view to be a window, i get the > >> >> exception > >> >> >> of: > >> >> >> >> >> >> > >> >> >> >> >> >> Attempted to get side input window for GlobalWindow from > >> >> >> non-global > >> >> >> >> >> >> WindowFn" > >> >> >> >> >> >> > >> >> >> >> >> >> chaim > >> >> >> >> >> >> > >> >> >> >> >> > >> >> >> >> > >> >> >> > >> >> > >> >
