Are your schemas actually supposed to be different between different
windows, or do they depend only on data?
I see you have a commented-out Window.into(new GlobalWindows()) for your
side input - did that work when it wasn't commented out?

On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote:

> my code is:
>
>                     //read docs from mongo
>                     final PCollection<Document> docs = pipeline
>                             .apply(table.getTableName(), MongoDbIO.read()
>                                     .withUri("mongodb://" +
> connectionParams)
>                                     .withFilter(filter)
>                                     .withDatabase(options.getDBName())
>                                     .withCollection(table.getTableName()))
>                             .apply("AddEventTimestamps",
> WithTimestamps.of((Document doc) -> new
> Instant(MongodbManagment.docTimeToLong(doc))))
>                             .apply("Window Daily",
> Window.into(CalendarWindows.days(1)));
>
>                     //update bq schema based on window
>                     final PCollectionView<Map<String, String>>
> tableSchemas = docs
> //                            .apply("Global Window",Window.into(new
> GlobalWindows()))
>                             .apply("extract schema " +
> table.getTableName(), new
> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>                             .apply("getTableSchemaMemory " +
> table.getTableName(),
> ParDo.of(getTableSchemaMemory(table.getTableName())))
>                             .apply(View.asMap());
>
>                     final PCollection<TableRow> docsRows = docs
>                             .apply("doc to row " +
> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
> tableSchemas))
>                                     .withSideInputs(tableSchemas));
>
>                     final WriteResult apply = docsRows
>                             .apply("insert data table - " +
> table.getTableName(),
>                                     BigQueryIO.writeTableRows()
>
> .to(TableRefPartition.perDay(options.getBQProject(),
> options.getDatasetId(), table.getBqTableName()))
>
> .withSchemaFromView(tableSchemas)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(WRITE_APPEND));
>
>
> exception is:
>
> Sep 08, 2017 12:16:55 PM
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> INFO: Opening TableRowWriter to
>
> gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalArgumentException: Attempted to get side input window
> for GlobalWindow from non-global WindowFn
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> at
> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
> at
> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
> Caused by: java.lang.IllegalArgumentException: Attempted to get side
> input window for GlobalWindow from non-global WindowFn
> at
> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
> Sep 08, 2017 12:16:58 PM
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>
> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> <[email protected]> wrote:
> > Please include the full exception and please show the code that produces
> it.
> > See also
> >
> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
> > section
> > "Side inputs and windowing" - that might be sufficient to resolve your
> > problem.
> >
> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote:
> >
> >> Hi,
> >>   I have a pipline that bases on documents from mongo updates the
> >> schema and then adds the records to mongo. Since i want a partitioned
> >> table, i have a dally window.
> >> How do i get the schema view to be a window, i get the exception of:
> >>
> >> Attempted to get side input window for GlobalWindow from non-global
> >> WindowFn"
> >>
> >> chaim
> >>
>

Reply via email to