the schema depends on the data per window. when i added the global window it works, but then i loose the performance, since the secound stage of writing will begin only after the side input has read all the data and updated the schema The batchmode of the BigqueryIO seems to use a global window that i don't know why?
chaim On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov <[email protected]> wrote: > Are your schemas actually supposed to be different between different > windows, or do they depend only on data? > I see you have a commented-out Window.into(new GlobalWindows()) for your > side input - did that work when it wasn't commented out? > > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote: > >> my code is: >> >> //read docs from mongo >> final PCollection<Document> docs = pipeline >> .apply(table.getTableName(), MongoDbIO.read() >> .withUri("mongodb://" + >> connectionParams) >> .withFilter(filter) >> .withDatabase(options.getDBName()) >> .withCollection(table.getTableName())) >> .apply("AddEventTimestamps", >> WithTimestamps.of((Document doc) -> new >> Instant(MongodbManagment.docTimeToLong(doc)))) >> .apply("Window Daily", >> Window.into(CalendarWindows.days(1))); >> >> //update bq schema based on window >> final PCollectionView<Map<String, String>> >> tableSchemas = docs >> // .apply("Global Window",Window.into(new >> GlobalWindows())) >> .apply("extract schema " + >> table.getTableName(), new >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) >> .apply("getTableSchemaMemory " + >> table.getTableName(), >> ParDo.of(getTableSchemaMemory(table.getTableName()))) >> .apply(View.asMap()); >> >> final PCollection<TableRow> docsRows = docs >> .apply("doc to row " + >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(), >> tableSchemas)) >> .withSideInputs(tableSchemas)); >> >> final WriteResult apply = docsRows >> .apply("insert data table - " + >> table.getTableName(), >> BigQueryIO.writeTableRows() >> >> .to(TableRefPartition.perDay(options.getBQProject(), >> options.getDatasetId(), table.getBqTableName())) >> >> .withSchemaFromView(tableSchemas) >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >> >> .withWriteDisposition(WRITE_APPEND)); >> >> >> exception is: >> >> Sep 08, 2017 12:16:55 PM >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> >> INFO: Opening TableRowWriter to >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb. >> Exception in thread "main" >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.IllegalArgumentException: Attempted to get side input window >> for GlobalWindow from non-global WindowFn >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331) >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301) >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) >> at >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347) >> at >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372) >> Caused by: java.lang.IllegalArgumentException: Attempted to get side >> input window for GlobalWindow from non-global WindowFn >> at >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49) >> at >> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94) >> at >> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76) >> Sep 08, 2017 12:16:58 PM >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov >> <[email protected]> wrote: >> > Please include the full exception and please show the code that produces >> it. >> > See also >> > >> https://beam.apache.org/documentation/programming-guide/#transforms-sideio >> > section >> > "Side inputs and windowing" - that might be sufficient to resolve your >> > problem. >> > >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote: >> > >> >> Hi, >> >> I have a pipline that bases on documents from mongo updates the >> >> schema and then adds the records to mongo. Since i want a partitioned >> >> table, i have a dally window. >> >> How do i get the schema view to be a window, i get the exception of: >> >> >> >> Attempted to get side input window for GlobalWindow from non-global >> >> WindowFn" >> >> >> >> chaim >> >> >>
