Great... created this https://issues.apache.org/jira/browse/BEAM-2276
On Fri, 12 May 2017 at 09:38 Jean-Baptiste Onofré <[email protected]> wrote: > +1 > > Borisa, if you want, we can work together on this. > > Thanks ! > Regards > JB > > On 05/12/2017 10:33 AM, Borisa Zivkovic wrote: > > +1 for DefaultFilenamePolicy being able to understand basic windowing... > > probably the most > > user-friendly way that would cover most of needs... in case of special > > needs users can provide their own policy.. > > > > another alternative would be to have new class called > > DefaultWindowedFilenamePolicy in package org.apache.beam.sdk.io ... > > > > any of those would make it easier for Beam users.. > > > > so, someone needs to decide how we want to do this and if you want I can > > work on it... > > > > cheers > > > > On Fri, 12 May 2017 at 08:18 Reuven Lax <[email protected]> > wrote: > > > >> I believe that for most windows there is a standard stringification. > >> However I think we could allow the user to inject a window formatter for > >> cases where there is no good default (e.g. where the window is a > >> complicated user-defined type, and toString() isn't good enough. > >> > >> Alternatively, if we don't want allow formatters,, we could make > >> DefaultFilenamePolicy work with default stringifications of well-know > >> windows (fixed, sliding, sessions, etc.), and just use toString() for > >> remaining window types. Users that have weird custom window types can > >> always right their own FilenamePolicy. > >> > >> On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw < > >> [email protected]> wrote: > >> > >>> I like the idea of WWW and PPP, assuming there is a standard enough > >>> stringification of windows and panes. However, we may want to elide > >>> adjacent tokes if the window is global or the pane is the only > >>> possible (or first?) one to avoid writing things like > >>> -0000-of-0005---. > >>> > >>> On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]> > >>> wrote: > >>>> Another idea - we can extend the existing pattern that > >>>> DefaultFileNamePolicy understands to include windows. > >>>> > >>>> Today it replaces SSS with the shard, and NNN with the number of > shards > >>> (so > >>>> many templates contain -SSS-of-NNN). We could also have it recognize > >> WWW > >>>> and PPP, for the window and the pane respectively. > >>>> > >>>> I believe this would be a backwards-compatible change. We do not need > >> to > >>>> change any existing interfaces, we would simply be allowing the > default > >>>> policy to work on windows. > >>>> > >>>> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> > >>> wrote: > >>>> > >>>>> +Eugene, Reuven who reviewed and implemented this code. They may have > >>>>> opinions. > >>>>> > >>>>> Note that changing the default filename policy would be > >>>>> backwards-incompatible, so this would either need to go into 2.0.0 > >> (and > >>> a > >>>>> new RC3) or it would not go in. > >>>>> > >>>>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic < > >>>>> [email protected]> wrote: > >>>>> > >>>>>> great JB, thanks > >>>>>> > >>>>>> I do not mind working on this - let's see if anyone else has > >> additional > >>>>>> input. > >>>>>> > >>>>>> cheers > >>>>>> > >>>>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> > >>>>>> wrote: > >>>>>> > >>>>>>> Got it. > >>>>>>> > >>>>>>> Yes, agree, I think the PerWindowFilesPolicy could be the default > >> and > >>>>>> let > >>>>>>> the > >>>>>>> user provides its own policy if he wants to. > >>>>>>> > >>>>>>> Regards > >>>>>>> JB > >>>>>>> > >>>>>>> On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: > >>>>>>>> Hi JB, > >>>>>>>> > >>>>>>>> yes I saw that thread - I also copied your code but did not want > >> to > >>>>>>> pollute > >>>>>>>> it with my proposal :) > >>>>>>>> > >>>>>>>> Well ok maybe default FilePerWindow policy for windowedWrites in > >>>>>> TextIO > >>>>>>>> does not make sense - not sure TBH... > >>>>>>>> > >>>>>>>> But would it make sense to promote a version of PerWindowFiles > >> from > >>>>>>>> > >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>>>>>>> so that it is easier to provide some kind of PerWindowFiles > >>> filename > >>>>>>>> policy.. > >>>>>>>> > >>>>>>>> > >>>>>>>> something like (where user does not have to write > >>>>>> PerWindowFilesPolicy, > >>>>>>> it > >>>>>>>> comes with Beam) > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) > >>>>>>>> .withWindowedWrites() > >>>>>>>> .withNumShards(1)); > >>>>>>>> > >>>>>>>> not sure if this was already discussed... > >>>>>>>> > >>>>>>>> cheers > >>>>>>>> Borisa > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré < > >> [email protected] > >>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Borisa, > >>>>>>>>> > >>>>>>>>> You can take a look about the other thread ("Direct runner > >> doesn't > >>>>>> seem > >>>>>>> to > >>>>>>>>> finalize checkpoint "quickly""). > >>>>>>>>> > >>>>>>>>> It's basically the same point ;) > >>>>>>>>> > >>>>>>>>> The default trigger (event-time) doesn't fire any data. I'm > >>>>>>> investigating > >>>>>>>>> the > >>>>>>>>> element timestamp and watermark. > >>>>>>>>> > >>>>>>>>> I'm also playing with that, for instance: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>>>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>>>>>>>> > >>>>>>>>> When you use WindowedWrite, you have to provide a filename > >>> policy. We > >>>>>>> could > >>>>>>>>> provide a default one, but not sure it will fit fine (as it > >>> depends a > >>>>>>> lot > >>>>>>>>> about > >>>>>>>>> the use cases). > >>>>>>>>> > >>>>>>>>> Regards > >>>>>>>>> JB > >>>>>>>>> > >>>>>>>>> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: > >>>>>>>>>> Hi guys, > >>>>>>>>>> > >>>>>>>>>> just playing with reading data from PubSub and writing using > >>> TextIO. > >>>>>>>>>> > >>>>>>>>>> First thing is that it is very hard to get any output - a lot > >> of > >>>>>> temp > >>>>>>>>> files > >>>>>>>>>> written but not always would get final files created. > >>>>>>>>>> > >>>>>>>>>> So, I am playing with triggers etc... If I do following > >>>>>>>>>> > >>>>>>>>>> PCollection<String> streamData = p.apply( > >>>>>>>>>> PubsubIO.readStrings().fromTopic("projects/"+ > >>> PROJECT_NAME > >>>>>> + > >>>>>>>>>> "/topics/myTopic")); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> streamData.apply(Window.<String>into(FixedWindows.of(Duratio > >>>>>> n.standardSeconds(5))) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst > >>>>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3)))) > >>>>>>>>>> .withAllowedLateness(Duration.ZERO) > >>>>>>>>>> .discardingFiredPanes()) > >>>>>>>>>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() > >>>>>>>>>> .withSuffix(".suff").withNumShards(10)); > >>>>>>>>>> > >>>>>>>>>> p.run(); > >>>>>>>>>> > >>>>>>>>>> I would expect to see some files in /tmp/ with final results.. > >>>>>> unless I > >>>>>>>>> add > >>>>>>>>>> good triggers I usually do not get any data.. only temp files > >> in > >>>>>>>>>> /temp/.beam/ > >>>>>>>>>> > >>>>>>>>>> but sometimes when data should be written I get following > >>> exception > >>>>>>>>>> > >>>>>>>>>> Exception in thread "main" > >>>>>>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > >>>>>>>>>> java.lang.UnsupportedOperationException: There is no default > >>>>>> policy for > >>>>>>>>>> windowed file output. Please provide an explicit FilenamePolicy > >>> to > >>>>>>>>> generate > >>>>>>>>>> filenames. > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>>> > >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>>>>> sult.waitUntilFinish(DirectRunner.java:322) > >>>>>>>>>> at > >>>>>>>>>> > >>>>>>>>> > >>>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>>>>> sult.waitUntilFinish(DirectRunner.java:292) > >>>>>>>>>> at > >>>>>>> org.apache.beam.runners.direct.DirectRunner.run( > >>> DirectRunner.java:200) > >>>>>>>>>> at > >>>>>>> org.apache.beam.runners.direct.DirectRunner.run( > >>> DirectRunner.java:63) > >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > >>>>>>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > >>>>>>>>>> at Test.main(Test.java:50) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Would it make sense to change TextIO so that it does not > >>>>>>>>>> use DefaultFilenamePolicy only - but in case there are > >>>>>> windowedWrites > >>>>>>> and > >>>>>>>>>> no filename policy was specified by user it could actually use > >>>>>> custom > >>>>>>>>>> FilePerWindow policy automatically. I believe today TextIO > >> always > >>>>>>> expects > >>>>>>>>>> user to specify FilenamePolicy, right? > >>>>>>>>>> > >>>>>>>>>> Or maybe to have FilePerWindow policy exposed as part of Beam > >> - I > >>>>>>> believe > >>>>>>>>>> today there are only implementations in tests and examples but > >>>>>> nothing > >>>>>>>>>> publicly visible, right? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> thanks > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Jean-Baptiste Onofré > >>>>>>>>> [email protected] > >>>>>>>>> http://blog.nanthrax.net > >>>>>>>>> Talend - http://www.talend.com > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Jean-Baptiste Onofré > >>>>>>> [email protected] > >>>>>>> http://blog.nanthrax.net > >>>>>>> Talend - http://www.talend.com > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>> > >> > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
