I believe that for most windows there is a standard stringification. However I think we could allow the user to inject a window formatter for cases where there is no good default (e.g. where the window is a complicated user-defined type, and toString() isn't good enough.
Alternatively, if we don't want allow formatters,, we could make DefaultFilenamePolicy work with default stringifications of well-know windows (fixed, sliding, sessions, etc.), and just use toString() for remaining window types. Users that have weird custom window types can always right their own FilenamePolicy. On Thu, May 11, 2017 at 4:24 PM, Robert Bradshaw < [email protected]> wrote: > I like the idea of WWW and PPP, assuming there is a standard enough > stringification of windows and panes. However, we may want to elide > adjacent tokes if the window is global or the pane is the only > possible (or first?) one to avoid writing things like > -0000-of-0005---. > > On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]> > wrote: > > Another idea - we can extend the existing pattern that > > DefaultFileNamePolicy understands to include windows. > > > > Today it replaces SSS with the shard, and NNN with the number of shards > (so > > many templates contain -SSS-of-NNN). We could also have it recognize WWW > > and PPP, for the window and the pane respectively. > > > > I believe this would be a backwards-compatible change. We do not need to > > change any existing interfaces, we would simply be allowing the default > > policy to work on windows. > > > > On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> > wrote: > > > >> +Eugene, Reuven who reviewed and implemented this code. They may have > >> opinions. > >> > >> Note that changing the default filename policy would be > >> backwards-incompatible, so this would either need to go into 2.0.0 (and > a > >> new RC3) or it would not go in. > >> > >> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic < > >> [email protected]> wrote: > >> > >>> great JB, thanks > >>> > >>> I do not mind working on this - let's see if anyone else has additional > >>> input. > >>> > >>> cheers > >>> > >>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]> > >>> wrote: > >>> > >>> > Got it. > >>> > > >>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and > >>> let > >>> > the > >>> > user provides its own policy if he wants to. > >>> > > >>> > Regards > >>> > JB > >>> > > >>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote: > >>> > > Hi JB, > >>> > > > >>> > > yes I saw that thread - I also copied your code but did not want to > >>> > pollute > >>> > > it with my proposal :) > >>> > > > >>> > > Well ok maybe default FilePerWindow policy for windowedWrites in > >>> TextIO > >>> > > does not make sense - not sure TBH... > >>> > > > >>> > > But would it make sense to promote a version of PerWindowFiles from > >>> > > > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>> > > so that it is easier to provide some kind of PerWindowFiles > filename > >>> > > policy.. > >>> > > > >>> > > > >>> > > something like (where user does not have to write > >>> PerWindowFilesPolicy, > >>> > it > >>> > > comes with Beam) > >>> > > > >>> > > > >>> > > > >>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix")) > >>> > > .withWindowedWrites() > >>> > > .withNumShards(1)); > >>> > > > >>> > > not sure if this was already discussed... > >>> > > > >>> > > cheers > >>> > > Borisa > >>> > > > >>> > > > >>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected] > > > >>> > wrote: > >>> > > > >>> > >> Hi Borisa, > >>> > >> > >>> > >> You can take a look about the other thread ("Direct runner doesn't > >>> seem > >>> > to > >>> > >> finalize checkpoint "quickly""). > >>> > >> > >>> > >> It's basically the same point ;) > >>> > >> > >>> > >> The default trigger (event-time) doesn't fire any data. I'm > >>> > investigating > >>> > >> the > >>> > >> element timestamp and watermark. > >>> > >> > >>> > >> I'm also playing with that, for instance: > >>> > >> > >>> > >> > >>> > >> > >>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src > >>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java > >>> > >> > >>> > >> When you use WindowedWrite, you have to provide a filename > policy. We > >>> > could > >>> > >> provide a default one, but not sure it will fit fine (as it > depends a > >>> > lot > >>> > >> about > >>> > >> the use cases). > >>> > >> > >>> > >> Regards > >>> > >> JB > >>> > >> > >>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote: > >>> > >>> Hi guys, > >>> > >>> > >>> > >>> just playing with reading data from PubSub and writing using > TextIO. > >>> > >>> > >>> > >>> First thing is that it is very hard to get any output - a lot of > >>> temp > >>> > >> files > >>> > >>> written but not always would get final files created. > >>> > >>> > >>> > >>> So, I am playing with triggers etc... If I do following > >>> > >>> > >>> > >>> PCollection<String> streamData = p.apply( > >>> > >>> PubsubIO.readStrings().fromTopic("projects/"+ > PROJECT_NAME > >>> + > >>> > >>> "/topics/myTopic")); > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >> > >>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio > >>> n.standardSeconds(5))) > >>> > >>> > >>> > >>> > >>> > >> > >>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst > >>> ElementInPane().plusDelayOf(Duration.standardSeconds(3)))) > >>> > >>> .withAllowedLateness(Duration.ZERO) > >>> > >>> .discardingFiredPanes()) > >>> > >>> .apply(TextIO.write().to("/tmp/abc").withWindowedWrites() > >>> > >>> .withSuffix(".suff").withNumShards(10)); > >>> > >>> > >>> > >>> p.run(); > >>> > >>> > >>> > >>> I would expect to see some files in /tmp/ with final results.. > >>> unless I > >>> > >> add > >>> > >>> good triggers I usually do not get any data.. only temp files in > >>> > >>> /temp/.beam/ > >>> > >>> > >>> > >>> but sometimes when data should be written I get following > exception > >>> > >>> > >>> > >>> Exception in thread "main" > >>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: > >>> > >>> java.lang.UnsupportedOperationException: There is no default > >>> policy for > >>> > >>> windowed file output. Please provide an explicit FilenamePolicy > to > >>> > >> generate > >>> > >>> filenames. > >>> > >>> at > >>> > >>> > >>> > >> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>> sult.waitUntilFinish(DirectRunner.java:322) > >>> > >>> at > >>> > >>> > >>> > >> > >>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe > >>> sult.waitUntilFinish(DirectRunner.java:292) > >>> > >>> at > >>> > org.apache.beam.runners.direct.DirectRunner.run( > DirectRunner.java:200) > >>> > >>> at > >>> > org.apache.beam.runners.direct.DirectRunner.run( > DirectRunner.java:63) > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295) > >>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281) > >>> > >>> at Test.main(Test.java:50) > >>> > >>> > >>> > >>> > >>> > >>> Would it make sense to change TextIO so that it does not > >>> > >>> use DefaultFilenamePolicy only - but in case there are > >>> windowedWrites > >>> > and > >>> > >>> no filename policy was specified by user it could actually use > >>> custom > >>> > >>> FilePerWindow policy automatically. I believe today TextIO always > >>> > expects > >>> > >>> user to specify FilenamePolicy, right? > >>> > >>> > >>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I > >>> > believe > >>> > >>> today there are only implementations in tests and examples but > >>> nothing > >>> > >>> publicly visible, right? > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> thanks > >>> > >>> > >>> > >> > >>> > >> -- > >>> > >> Jean-Baptiste Onofré > >>> > >> [email protected] > >>> > >> http://blog.nanthrax.net > >>> > >> Talend - http://www.talend.com > >>> > >> > >>> > > > >>> > > >>> > -- > >>> > Jean-Baptiste Onofré > >>> > [email protected] > >>> > http://blog.nanthrax.net > >>> > Talend - http://www.talend.com > >>> > > >>> > >> > >> >
