batch on dataflow

On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <[email protected]> wrote:
> Which runner are you using? And is this a batch pipeline?
>
> On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <[email protected]> wrote:
>
>> Thank for the answer, but i don't think that that is the case. From
>> what i have seen, since i have other code to update status based on
>> the window, it does get called before all the windows are calculated.
>> There is no logical reason to wait, once the window has finished, the
>> rest of the pipeline should run and the BigQuery should start to write
>> the results.
>>
>>
>>
>> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <[email protected]>
>> wrote:
>> > Logically the BigQuery write does not depend on windows, and writing it
>> > windowed would result in incorrect output. For this reason, BigQueryIO
>> > rewindows int global windows before actually writing to BigQuery.
>> >
>> > If you are running in batch mode, there is no performance difference
>> > between windowed and unwindowed side inputs. I believe that all of the
>> > batch runners wait until all windows are calculated before materializing
>> > the output.
>> >
>> > Reuven
>> >
>> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <[email protected]> wrote:
>> >
>> >> the schema depends on the data per window.
>> >> when i added the global window it works, but then i loose the
>> >> performance, since the secound stage of writing will begin only after
>> >> the side input has read all the data and updated the schema
>> >> The batchmode of the BigqueryIO seems to use a global window that i
>> >> don't know why?
>> >>
>> >> chaim
>> >>
>> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> <[email protected]> wrote:
>> >> > Are your schemas actually supposed to be different between different
>> >> > windows, or do they depend only on data?
>> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
>> your
>> >> > side input - did that work when it wasn't commented out?
>> >> >
>> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <[email protected]> wrote:
>> >> >
>> >> >> my code is:
>> >> >>
>> >> >>                     //read docs from mongo
>> >> >>                     final PCollection<Document> docs = pipeline
>> >> >>                             .apply(table.getTableName(),
>> >> MongoDbIO.read()
>> >> >>                                     .withUri("mongodb://" +
>> >> >> connectionParams)
>> >> >>                                     .withFilter(filter)
>> >> >>                                     .withDatabase(options.
>> getDBName())
>> >> >>                                     .withCollection(table.
>> >> getTableName()))
>> >> >>                             .apply("AddEventTimestamps",
>> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >>                             .apply("Window Daily",
>> >> >> Window.into(CalendarWindows.days(1)));
>> >> >>
>> >> >>                     //update bq schema based on window
>> >> >>                     final PCollectionView<Map<String, String>>
>> >> >> tableSchemas = docs
>> >> >> //                            .apply("Global Window",Window.into(new
>> >> >> GlobalWindows()))
>> >> >>                             .apply("extract schema " +
>> >> >> table.getTableName(), new
>> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >>                             .apply("getTableSchemaMemory " +
>> >> >> table.getTableName(),
>> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >>                             .apply(View.asMap());
>> >> >>
>> >> >>                     final PCollection<TableRow> docsRows = docs
>> >> >>                             .apply("doc to row " +
>> >> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> >> >> tableSchemas))
>> >> >>                                     .withSideInputs(tableSchemas));
>> >> >>
>> >> >>                     final WriteResult apply = docsRows
>> >> >>                             .apply("insert data table - " +
>> >> >> table.getTableName(),
>> >> >>                                     BigQueryIO.writeTableRows()
>> >> >>
>> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >>
>> >> >> .withSchemaFromView(tableSchemas)
>> >> >>
>> >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
>> >> NEEDED)
>> >> >>
>> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >>
>> >> >>
>> >> >> exception is:
>> >> >>
>> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> INFO: Opening TableRowWriter to
>> >> >>
>> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> >> >> Exception in thread "main"
>> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> java.lang.IllegalArgumentException: Attempted to get side input
>> window
>> >> >> for GlobalWindow from non-global WindowFn
>> >> >> at
>> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> DirectRunner.java:200)
>> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> DirectRunner.java:63)
>> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> at
>> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> at
>> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
>> >> LoadMongodbDataPipeline.java:372)
>> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> at
>> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
>> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> SimplePushbackSideInputDoFnRunner.isReady(
>> SimplePushbackSideInputDoFnRun
>> >> ner.java:94)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >>
>> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> <[email protected]> wrote:
>> >> >> > Please include the full exception and please show the code that
>> >> produces
>> >> >> it.
>> >> >> > See also
>> >> >> >
>> >> >> https://beam.apache.org/documentation/programming-
>> >> guide/#transforms-sideio
>> >> >> > section
>> >> >> > "Side inputs and windowing" - that might be sufficient to resolve
>> your
>> >> >> > problem.
>> >> >> >
>> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <[email protected]>
>> wrote:
>> >> >> >
>> >> >> >> Hi,
>> >> >> >>   I have a pipline that bases on documents from mongo updates the
>> >> >> >> schema and then adds the records to mongo. Since i want a
>> partitioned
>> >> >> >> table, i have a dally window.
>> >> >> >> How do i get the schema view to be a window, i get the exception
>> of:
>> >> >> >>
>> >> >> >> Attempted to get side input window for GlobalWindow from
>> non-global
>> >> >> >> WindowFn"
>> >> >> >>
>> >> >> >> chaim
>> >> >> >>
>> >> >>
>> >>
>>

Reply via email to