Ok, something is going wrong then. It appears that your job created over
14,000 BigQuery load jobs, which is not expected (and probably why things
were so slow).

On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <[email protected]> wrote:

> no that specific job created only 2 tables
>
> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <[email protected]>
> wrote:
> > It looks like your job is creating about 14,45 distinct BigQuery tables.
> > Does that sound correct to you?
> >
> > Reuven
> >
> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <[email protected]> wrote:
> >
> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >> as you can see the majority of the time is inserting into bigquery.
> >> is there any way to parallel this?
> >>
> >> My feeling for the windowing is that writing should be done per window
> >> (my window is daily) or at least to be able to configure it
> >>
> >> chaim
> >>
> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <[email protected]>
> >> wrote:
> >> > So the problem is you are running on Dataflow, and it's taking longer
> >> than
> >> > you think it should? If you provide the Dataflow job id we can help
> you
> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> into a
> >> > Dataflow debugging session we should move it off of the Beam list and
> >> onto
> >> > a Dataflow-specific tread)
> >> >
> >> > Reuven
> >> >
> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <[email protected]>
> wrote:
> >> >
> >> >> is there a way around this, my time for 13gb is not close to 30
> >> >> minutes, while it should be around 15 minutes.
> >> >> Do i need to chunk the code myself to windows, and run in parallel?
> >> >> chaim
> >> >>
> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <[email protected]
> >
> >> >> wrote:
> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
> >> does
> >> >> not
> >> >> > produce results for a stage until it has processed that entire
> stage.
> >> The
> >> >> > reason for this is that the batch runner is optimized for
> throughput,
> >> not
> >> >> > latency; it wants to minimize the time for the entire job to
> finish,
> >> not
> >> >> > the time till first output. The side input will not be materialized
> >> until
> >> >> > all of the data for all of the windows of the side input have been
> >> >> > processed. The streaming runner on the other hand will produce
> >> windows as
> >> >> > they finish. So for the batch runner, there is no performance
> >> advantage
> >> >> you
> >> >> > get for windowing the side input.
> >> >> >
> >> >> > The fact that BigQueryIO needs the schema side input to be globally
> >> >> > windowed is a bit confusing and not well documented. We should add
> >> better
> >> >> > javadoc explaining this.
> >> >> >
> >> >> > Reuven
> >> >> >
> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <[email protected]>
> >> wrote:
> >> >> >
> >> >> >> batch on dataflow
> >> >> >>
> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> <[email protected]
> >> >
> >> >> >> wrote:
> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >> >> >> >
> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]
> >
> >> >> wrote:
> >> >> >> >
> >> >> >> >> Thank for the answer, but i don't think that that is the case.
> >> From
> >> >> >> >> what i have seen, since i have other code to update status
> based
> >> on
> >> >> >> >> the window, it does get called before all the windows are
> >> calculated.
> >> >> >> >> There is no logical reason to wait, once the window has
> finished,
> >> the
> >> >> >> >> rest of the pipeline should run and the BigQuery should start
> to
> >> >> write
> >> >> >> >> the results.
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >> <[email protected]
> >> >> >
> >> >> >> >> wrote:
> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
> >> >> writing
> >> >> >> it
> >> >> >> >> > windowed would result in incorrect output. For this reason,
> >> >> BigQueryIO
> >> >> >> >> > rewindows int global windows before actually writing to
> >> BigQuery.
> >> >> >> >> >
> >> >> >> >> > If you are running in batch mode, there is no performance
> >> >> difference
> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
> all
> >> of
> >> >> the
> >> >> >> >> > batch runners wait until all windows are calculated before
> >> >> >> materializing
> >> >> >> >> > the output.
> >> >> >> >> >
> >> >> >> >> > Reuven
> >> >> >> >> >
> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> [email protected]
> >> >
> >> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> the schema depends on the data per window.
> >> >> >> >> >> when i added the global window it works, but then i loose
> the
> >> >> >> >> >> performance, since the secound stage of writing will begin
> only
> >> >> after
> >> >> >> >> >> the side input has read all the data and updated the schema
> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
> >> that
> >> >> i
> >> >> >> >> >> don't know why?
> >> >> >> >> >>
> >> >> >> >> >> chaim
> >> >> >> >> >>
> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> >> >> >> <[email protected]> wrote:
> >> >> >> >> >> > Are your schemas actually supposed to be different between
> >> >> >> different
> >> >> >> >> >> > windows, or do they depend only on data?
> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >> GlobalWindows())
> >> >> for
> >> >> >> >> your
> >> >> >> >> >> > side input - did that work when it wasn't commented out?
> >> >> >> >> >> >
> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >> [email protected]>
> >> >> >> wrote:
> >> >> >> >> >> >
> >> >> >> >> >> >> my code is:
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     //read docs from mongo
> >> >> >> >> >> >>                     final PCollection<Document> docs =
> >> pipeline
> >> >> >> >> >> >>                             .apply(table.getTableName(),
> >> >> >> >> >> MongoDbIO.read()
> >> >> >> >> >> >>
>  .withUri("mongodb://" +
> >> >> >> >> >> >> connectionParams)
> >> >> >> >> >> >>                                     .withFilter(filter)
> >> >> >> >> >> >>
>  .withDatabase(options.
> >> >> >> >> getDBName())
> >> >> >> >> >> >>
>  .withCollection(table.
> >> >> >> >> >> getTableName()))
> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >> >> >> >>                             .apply("Window Daily",
> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     //update bq schema based on window
> >> >> >> >> >> >>                     final PCollectionView<Map<String,
> >> String>>
> >> >> >> >> >> >> tableSchemas = docs
> >> >> >> >> >> >> //                            .apply("Global
> >> >> >> Window",Window.into(new
> >> >> >> >> >> >> GlobalWindows()))
> >> >> >> >> >> >>                             .apply("extract schema " +
> >> >> >> >> >> >> table.getTableName(), new
> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
> " +
> >> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >> >> >> >>                             .apply(View.asMap());
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
> =
> >> docs
> >> >> >> >> >> >>                             .apply("doc to row " +
> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >> >> >> getBqTableName(),
> >> >> >> >> >> >> tableSchemas))
> >> >> >> >> >> >>
> >> >>  .withSideInputs(tableSchemas))
> >> >> >> ;
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     final WriteResult apply = docsRows
> >> >> >> >> >> >>                             .apply("insert data table -
> " +
> >> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >> >>
> >>  BigQueryIO.writeTableRows()
> >> >> >> >> >> >>
> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >> >> >> CreateDisposition.CREATE_IF_
> >> >> >> >> >> NEEDED)
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >> >> >> >>
> >> >> >> >> >> >>
> >> >> >> >> >> >> exception is:
> >> >> >> >> >> >>
> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> <init>
> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >> >> >> >> >> >>
> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> >> >> >> d9a12e4fdcfb.
> >> >> >> >> >> >> Exception in thread "main"
> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
> side
> >> >> input
> >> >> >> >> window
> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >> DirectPipelineResult.
> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >> DirectPipelineResult.
> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> >> DirectRunner.java:200)
> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> >> DirectRunner.java:63)
> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> LoadMongodbDataPipeline.
> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> >> >> LoadMongodbDataPipeline.main(
> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> Attempted to
> >> >> get
> >> >> >> side
> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >> >> PartitioningWindowFn$1.
> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> >> >> >> SimplePushbackSideInputDoFnRun
> >> >> >> >> >> ner.java:94)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >> processElementInReadyWindows(
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> <init>
> >> >> >> >> >> >>
> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> >> >> >> <[email protected]> wrote:
> >> >> >> >> >> >> > Please include the full exception and please show the
> code
> >> >> that
> >> >> >> >> >> produces
> >> >> >> >> >> >> it.
> >> >> >> >> >> >> > See also
> >> >> >> >> >> >> >
> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> >> >> >> guide/#transforms-sideio
> >> >> >> >> >> >> > section
> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
> to
> >> >> >> resolve
> >> >> >> >> your
> >> >> >> >> >> >> > problem.
> >> >> >> >> >> >> >
> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >> >> [email protected]>
> >> >> >> >> wrote:
> >> >> >> >> >> >> >
> >> >> >> >> >> >> >> Hi,
> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
> >> >> updates
> >> >> >> the
> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
> want a
> >> >> >> >> partitioned
> >> >> >> >> >> >> >> table, i have a dally window.
> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
> >> >> >> exception
> >> >> >> >> of:
> >> >> >> >> >> >> >>
> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
> from
> >> >> >> >> non-global
> >> >> >> >> >> >> >> WindowFn"
> >> >> >> >> >> >> >>
> >> >> >> >> >> >> >> chaim
> >> >> >> >> >> >> >>
> >> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Reply via email to