batch on dataflow
On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected]> wrote: > Which runner are you using? And is this a batch pipeline? > > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> wrote: > >> Thank for the answer, but i don't think that that is the case. From >> what i have seen, since i have other code to update status based on >> the window, it does get called before all the windows are calculated. >> There is no logical reason to wait, once the window has finished, the >> rest of the pipeline should run and the BigQuery should start to write >> the results. >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]> >> wrote: >> > Logically the BigQuery write does not depend on windows, and writing it >> > windowed would result in incorrect output. For this reason, BigQueryIO >> > rewindows int global windows before actually writing to BigQuery. >> > >> > If you are running in batch mode, there is no performance difference >> > between windowed and unwindowed side inputs. I believe that all of the >> > batch runners wait until all windows are calculated before materializing >> > the output. >> > >> > Reuven >> > >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> wrote: >> > >> >> the schema depends on the data per window. >> >> when i added the global window it works, but then i loose the >> >> performance, since the secound stage of writing will begin only after >> >> the side input has read all the data and updated the schema >> >> The batchmode of the BigqueryIO seems to use a global window that i >> >> don't know why? >> >> >> >> chaim >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov >> >> <[email protected]> wrote: >> >> > Are your schemas actually supposed to be different between different >> >> > windows, or do they depend only on data? >> >> > I see you have a commented-out Window.into(new GlobalWindows()) for >> your >> >> > side input - did that work when it wasn't commented out? >> >> > >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote: >> >> > >> >> >> my code is: >> >> >> >> >> >> //read docs from mongo >> >> >> final PCollection<Document> docs = pipeline >> >> >> .apply(table.getTableName(), >> >> MongoDbIO.read() >> >> >> .withUri("mongodb://" + >> >> >> connectionParams) >> >> >> .withFilter(filter) >> >> >> .withDatabase(options. >> getDBName()) >> >> >> .withCollection(table. >> >> getTableName())) >> >> >> .apply("AddEventTimestamps", >> >> >> WithTimestamps.of((Document doc) -> new >> >> >> Instant(MongodbManagment.docTimeToLong(doc)))) >> >> >> .apply("Window Daily", >> >> >> Window.into(CalendarWindows.days(1))); >> >> >> >> >> >> //update bq schema based on window >> >> >> final PCollectionView<Map<String, String>> >> >> >> tableSchemas = docs >> >> >> // .apply("Global Window",Window.into(new >> >> >> GlobalWindows())) >> >> >> .apply("extract schema " + >> >> >> table.getTableName(), new >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) >> >> >> .apply("getTableSchemaMemory " + >> >> >> table.getTableName(), >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName()))) >> >> >> .apply(View.asMap()); >> >> >> >> >> >> final PCollection<TableRow> docsRows = docs >> >> >> .apply("doc to row " + >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(), >> >> >> tableSchemas)) >> >> >> .withSideInputs(tableSchemas)); >> >> >> >> >> >> final WriteResult apply = docsRows >> >> >> .apply("insert data table - " + >> >> >> table.getTableName(), >> >> >> BigQueryIO.writeTableRows() >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(), >> >> >> options.getDatasetId(), table.getBqTableName())) >> >> >> >> >> >> .withSchemaFromView(tableSchemas) >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_ >> >> NEEDED) >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND)); >> >> >> >> >> >> >> >> >> exception is: >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> >> >> >> INFO: Opening TableRowWriter to >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/ >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb. >> >> >> Exception in thread "main" >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> >> >> java.lang.IllegalArgumentException: Attempted to get side input >> window >> >> >> for GlobalWindow from non-global WindowFn >> >> >> at >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. >> >> waitUntilFinish(DirectRunner.java:331) >> >> >> at >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult. >> >> waitUntilFinish(DirectRunner.java:301) >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >> DirectRunner.java:200) >> >> >> at org.apache.beam.runners.direct.DirectRunner.run( >> >> DirectRunner.java:63) >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) >> >> >> at >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline. >> >> runPipeline(LoadMongodbDataPipeline.java:347) >> >> >> at >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main( >> >> LoadMongodbDataPipeline.java:372) >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side >> >> >> input window for GlobalWindow from non-global WindowFn >> >> >> at >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1. >> >> getSideInputWindow(PartitioningWindowFn.java:49) >> >> >> at >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. >> >> SimplePushbackSideInputDoFnRunner.isReady( >> SimplePushbackSideInputDoFnRun >> >> ner.java:94) >> >> >> at >> >> >> org.apache.beam.runners.direct.repackaged.runners.core. >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows( >> >> SimplePushbackSideInputDoFnRunner.java:76) >> >> >> Sep 08, 2017 12:16:58 PM >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov >> >> >> <[email protected]> wrote: >> >> >> > Please include the full exception and please show the code that >> >> produces >> >> >> it. >> >> >> > See also >> >> >> > >> >> >> https://beam.apache.org/documentation/programming- >> >> guide/#transforms-sideio >> >> >> > section >> >> >> > "Side inputs and windowing" - that might be sufficient to resolve >> your >> >> >> > problem. >> >> >> > >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> >> wrote: >> >> >> > >> >> >> >> Hi, >> >> >> >> I have a pipline that bases on documents from mongo updates the >> >> >> >> schema and then adds the records to mongo. Since i want a >> partitioned >> >> >> >> table, i have a dally window. >> >> >> >> How do i get the schema view to be a window, i get the exception >> of: >> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow from >> non-global >> >> >> >> WindowFn" >> >> >> >> >> >> >> >> chaim >> >> >> >> >> >> >> >> >> >>
