my code is:

                    //read docs from mongo
                    final PCollection<Document> docs = pipeline
                            .apply(table.getTableName(), MongoDbIO.read()
                                    .withUri("mongodb://" + connectionParams)
                                    .withFilter(filter)
                                    .withDatabase(options.getDBName())
                                    .withCollection(table.getTableName()))
                            .apply("AddEventTimestamps",
WithTimestamps.of((Document doc) -> new
Instant(MongodbManagment.docTimeToLong(doc))))
                            .apply("Window Daily",
Window.into(CalendarWindows.days(1)));

                    //update bq schema based on window
                    final PCollectionView<Map<String, String>>
tableSchemas = docs
//                            .apply("Global Window",Window.into(new
GlobalWindows()))
                            .apply("extract schema " +
table.getTableName(), new
LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
                            .apply("getTableSchemaMemory " +
table.getTableName(),
ParDo.of(getTableSchemaMemory(table.getTableName())))
                            .apply(View.asMap());

                    final PCollection<TableRow> docsRows = docs
                            .apply("doc to row " +
table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
tableSchemas))
                                    .withSideInputs(tableSchemas));

                    final WriteResult apply = docsRows
                            .apply("insert data table - " +
table.getTableName(),
                                    BigQueryIO.writeTableRows()

.to(TableRefPartition.perDay(options.getBQProject(),
options.getDatasetId(), table.getBqTableName()))
                                            .withSchemaFromView(tableSchemas)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(WRITE_APPEND));


exception is:

Sep 08, 2017 12:16:55 PM
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
INFO: Opening TableRowWriter to
gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException: Attempted to get side input window
for GlobalWindow from non-global WindowFn
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at 
com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
at 
com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
Caused by: java.lang.IllegalArgumentException: Attempted to get side
input window for GlobalWindow from non-global WindowFn
at 
org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
at 
org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
Sep 08, 2017 12:16:58 PM
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>

On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
<[email protected]> wrote:
> Please include the full exception and please show the code that produces it.
> See also
> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
> section
> "Side inputs and windowing" - that might be sufficient to resolve your
> problem.
>
> On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]> wrote:
>
>> Hi,
>>   I have a pipline that bases on documents from mongo updates the
>> schema and then adds the records to mongo. Since i want a partitioned
>> table, i have a dally window.
>> How do i get the schema view to be a window, i get the exception of:
>>
>> Attempted to get side input window for GlobalWindow from non-global
>> WindowFn"
>>
>> chaim
>>

Reply via email to