Are your schemas actually supposed to be different between different windows, or do they depend only on data? I see you have a commented-out Window.into(new GlobalWindows()) for your side input - did that work when it wasn't commented out?
On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote: > my code is: > > //read docs from mongo > final PCollection<Document> docs = pipeline > .apply(table.getTableName(), MongoDbIO.read() > .withUri("mongodb://" + > connectionParams) > .withFilter(filter) > .withDatabase(options.getDBName()) > .withCollection(table.getTableName())) > .apply("AddEventTimestamps", > WithTimestamps.of((Document doc) -> new > Instant(MongodbManagment.docTimeToLong(doc)))) > .apply("Window Daily", > Window.into(CalendarWindows.days(1))); > > //update bq schema based on window > final PCollectionView<Map<String, String>> > tableSchemas = docs > // .apply("Global Window",Window.into(new > GlobalWindows())) > .apply("extract schema " + > table.getTableName(), new > LoadMongodbSchemaPipeline.DocsToSchemaTransform(table)) > .apply("getTableSchemaMemory " + > table.getTableName(), > ParDo.of(getTableSchemaMemory(table.getTableName()))) > .apply(View.asMap()); > > final PCollection<TableRow> docsRows = docs > .apply("doc to row " + > table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(), > tableSchemas)) > .withSideInputs(tableSchemas)); > > final WriteResult apply = docsRows > .apply("insert data table - " + > table.getTableName(), > BigQueryIO.writeTableRows() > > .to(TableRefPartition.perDay(options.getBQProject(), > options.getDatasetId(), table.getBqTableName())) > > .withSchemaFromView(tableSchemas) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) > > .withWriteDisposition(WRITE_APPEND)); > > > exception is: > > Sep 08, 2017 12:16:55 PM > org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > INFO: Opening TableRowWriter to > > gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb. > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalArgumentException: Attempted to get side input window > for GlobalWindow from non-global WindowFn > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283) > at > com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347) > at > com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372) > Caused by: java.lang.IllegalArgumentException: Attempted to get side > input window for GlobalWindow from non-global WindowFn > at > org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94) > at > org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76) > Sep 08, 2017 12:16:58 PM > org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init> > > On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov > <[email protected]> wrote: > > Please include the full exception and please show the code that produces > it. > > See also > > > https://beam.apache.org/documentation/programming-guide/#transforms-sideio > > section > > "Side inputs and windowing" - that might be sufficient to resolve your > > problem. > > > > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote: > > > >> Hi, > >> I have a pipline that bases on documents from mongo updates the > >> schema and then adds the records to mongo. Since i want a partitioned > >> table, i have a dally window. > >> How do i get the schema view to be a window, i get the exception of: > >> > >> Attempted to get side input window for GlobalWindow from non-global > >> WindowFn" > >> > >> chaim > >> >
