Which runner are you using? And is this a batch pipeline? On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> wrote:
> Thank for the answer, but i don't think that that is the case. From > what i have seen, since i have other code to update status based on > the window, it does get called before all the windows are calculated. > There is no logical reason to wait, once the window has finished, the > rest of the pipeline should run and the BigQuery should start to write > the results. > > > > On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]> > wrote: > > Logically the BigQuery write does not depend on windows, and writing it > > windowed would result in incorrect output. For this reason, BigQueryIO > > rewindows int global windows before actually writing to BigQuery. > > > > If you are running in batch mode, there is no performance difference > > between windowed and unwindowed side inputs. I believe that all of the > > batch runners wait until all windows are calculated before materializing > > the output. > > > > Reuven > > > > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> wrote: > > > >> the schema depends on the data per window. > >> when i added the global window it works, but then i loose the > >> performance, since the secound stage of writing will begin only after > >> the side input has read all the data and updated the schema > >> The batchmode of the BigqueryIO seems to use a global window that i > >> don't know why? > >> > >> chaim > >> > >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov > >> <[email protected]> wrote: > >> > Are your schemas actually supposed to be different between different > >> > windows, or do they depend only on data? > >> > I see you have a commented-out Window.into(new GlobalWindows()) for > your > >> > side input - did that work when it wasn't commented out? > >> > > >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote: > >> > > >> >> my code is: > >> >> > >> >> //read docs from mongo > >> >> final PCollection<Document> docs = pipeline > >> >> .apply(table.getTableName(), > >> MongoDbIO.read() > >> >> .withUri("mongodb://" + > >> >> connectionParams) > >> >> .withFilter(filter) > >> >> .withDatabase(options. > getDBName()) > >> >> .withCollection(table. > >> getTableName())) > >> >> .apply("AddEventTimestamps", > >> >> WithTimestamps.of((Document doc) -> new > >> >> Instant(MongodbManagment.docTimeToLong(doc)))) > >> >> .apply("Window Daily", > >> >> Window.into(CalendarWindows.days(1))); > >> >> > >> >> //update bq schema based on window > >> >> final PCollectionView<Map<String, String>> > >> >> tableSchemas = docs > >> >> // .apply("Global Window",Window.into(new > >> >> GlobalWindows())) > >> >> .apply("extract schema " + > >> >> table.getTableName(), new > >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) > >> >> .apply("getTableSchemaMemory " + > >> >> table.getTableName(), > >> >> ParDo.of(getTableSchemaMemory(table.getTableName()))) > >> >> .apply(View.asMap()); > >> >> > >> >> final PCollection<TableRow> docsRows = docs > >> >> .apply("doc to row " + > >> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(), > >> >> tableSchemas)) > >> >> .withSideInputs(tableSchemas)); > >> >> > >> >> final WriteResult apply = docsRows > >> >> .apply("insert data table - " + > >> >> table.getTableName(), > >> >> BigQueryIO.writeTableRows() > >> >> > >> >> .to(TableRefPartition.perDay(options.getBQProject(), > >> >> options.getDatasetId(), table.getBqTableName())) > >> >> > >> >> .withSchemaFromView(tableSchemas) > >> >> > >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_ > >> NEEDED) > >> >> > >> >> .withWriteDisposition(WRITE_APPEND)); > >> >> > >> >> > >> >> exception is: > >> >> > >> >> Sep 08, 2017 12:16:55 PM > >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > >> >> INFO: Opening TableRowWriter to > >> >> > >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ > >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb. > >> >> Exception in thread "main" > >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > >> >> java.lang.IllegalArgumentException: Attempted to get side input > window > >> >> for GlobalWindow from non-global WindowFn > >> >> at > >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > >> waitUntilFinish(DirectRunner.java:331) > >> >> at > >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. > >> waitUntilFinish(DirectRunner.java:301) > >> >> at org.apache.beam.runners.direct.DirectRunner.run( > >> DirectRunner.java:200) > >> >> at org.apache.beam.runners.direct.DirectRunner.run( > >> DirectRunner.java:63) > >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) > >> >> at > >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline. > >> runPipeline(LoadMongodbDataPipeline.java:347) > >> >> at > >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main( > >> LoadMongodbDataPipeline.java:372) > >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side > >> >> input window for GlobalWindow from non-global WindowFn > >> >> at > >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1. > >> getSideInputWindow(PartitioningWindowFn.java:49) > >> >> at > >> >> org.apache.beam.runners.direct.repackaged.runners.core. > >> SimplePushbackSideInputDoFnRunner.isReady( > SimplePushbackSideInputDoFnRun > >> ner.java:94) > >> >> at > >> >> org.apache.beam.runners.direct.repackaged.runners.core. > >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows( > >> SimplePushbackSideInputDoFnRunner.java:76) > >> >> Sep 08, 2017 12:16:58 PM > >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > >> >> > >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov > >> >> <[email protected]> wrote: > >> >> > Please include the full exception and please show the code that > >> produces > >> >> it. > >> >> > See also > >> >> > > >> >> https://beam.apache.org/documentation/programming- > >> guide/#transforms-sideio > >> >> > section > >> >> > "Side inputs and windowing" - that might be sufficient to resolve > your > >> >> > problem. > >> >> > > >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> > wrote: > >> >> > > >> >> >> Hi, > >> >> >> I have a pipline that bases on documents from mongo updates the > >> >> >> schema and then adds the records to mongo. Since i want a > partitioned > >> >> >> table, i have a dally window. > >> >> >> How do i get the schema view to be a window, i get the exception > of: > >> >> >> > >> >> >> Attempted to get side input window for GlobalWindow from > non-global > >> >> >> WindowFn" > >> >> >> > >> >> >> chaim > >> >> >> > >> >> > >> >
