I like the idea of WWW and PPP, assuming there is a standard enough
stringification of windows and panes. However, we may want to elide
adjacent tokes if the window is global or the pane is the only
possible (or first?) one to avoid writing things like
-0000-of-0005---.

On Thu, May 11, 2017 at 8:47 AM, Reuven Lax <[email protected]> wrote:
> Another idea - we can extend the existing pattern that
> DefaultFileNamePolicy understands to include windows.
>
> Today it replaces SSS with the shard, and NNN with the number of shards (so
> many templates contain -SSS-of-NNN). We could also have it recognize WWW
> and PPP, for the window and the pane respectively.
>
> I believe this would be a backwards-compatible change. We do not need to
> change any existing interfaces, we would simply be allowing the default
> policy to work on windows.
>
> On Thu, May 11, 2017 at 8:41 AM, Dan Halperin <[email protected]> wrote:
>
>> +Eugene, Reuven who reviewed and implemented this code. They may have
>> opinions.
>>
>> Note that changing the default filename policy would be
>> backwards-incompatible, so this would either need to go into 2.0.0 (and a
>> new RC3) or it would not go in.
>>
>> On Thu, May 11, 2017 at 8:36 AM, Borisa Zivkovic <
>> [email protected]> wrote:
>>
>>> great JB, thanks
>>>
>>> I do not mind working on this - let's see if anyone else has additional
>>> input.
>>>
>>> cheers
>>>
>>> On Thu, 11 May 2017 at 16:28 Jean-Baptiste Onofré <[email protected]>
>>> wrote:
>>>
>>> > Got it.
>>> >
>>> > Yes, agree, I think the PerWindowFilesPolicy could be the default and
>>> let
>>> > the
>>> > user provides its own policy if he wants to.
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 05/11/2017 05:23 PM, Borisa Zivkovic wrote:
>>> > > Hi  JB,
>>> > >
>>> > > yes I saw that thread - I also copied your code but did not want to
>>> > pollute
>>> > > it with my proposal :)
>>> > >
>>> > > Well ok maybe default FilePerWindow policy for windowedWrites in
>>> TextIO
>>> > > does not make sense - not sure TBH...
>>> > >
>>> > > But would it make sense to promote a version of PerWindowFiles from
>>> > >
>>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>> > > so that it is easier to provide some kind of PerWindowFiles filename
>>> > > policy..
>>> > >
>>> > >
>>> > > something like (where user does not have to write
>>> PerWindowFilesPolicy,
>>> > it
>>> > > comes with Beam)
>>> > >
>>> > >
>>> > >
>>> > > .withFilenamePolicy(PerWindowFilesPolicy.withSuffix("mySuffix"))
>>> > > .withWindowedWrites()
>>> > > .withNumShards(1));
>>> > >
>>> > > not sure if this was already discussed...
>>> > >
>>> > > cheers
>>> > > Borisa
>>> > >
>>> > >
>>> > > On Thu, 11 May 2017 at 16:15 Jean-Baptiste Onofré <[email protected]>
>>> > wrote:
>>> > >
>>> > >> Hi Borisa,
>>> > >>
>>> > >> You can take a look about the other thread ("Direct runner doesn't
>>> seem
>>> > to
>>> > >> finalize checkpoint "quickly"").
>>> > >>
>>> > >> It's basically the same point ;)
>>> > >>
>>> > >> The default trigger (event-time) doesn't fire any data. I'm
>>> > investigating
>>> > >> the
>>> > >> element timestamp and watermark.
>>> > >>
>>> > >> I'm also playing with that, for instance:
>>> > >>
>>> > >>
>>> > >>
>>> > https://github.com/jbonofre/beam-samples/blob/master/iot/src
>>> /main/java/org/apache/beam/samples/iot/JmsToHdfs.java
>>> > >>
>>> > >> When you use WindowedWrite, you have to provide a filename policy. We
>>> > could
>>> > >> provide a default one, but not sure it will fit fine (as it depends a
>>> > lot
>>> > >> about
>>> > >> the use cases).
>>> > >>
>>> > >> Regards
>>> > >> JB
>>> > >>
>>> > >> On 05/11/2017 05:01 PM, Borisa Zivkovic wrote:
>>> > >>> Hi guys,
>>> > >>>
>>> > >>> just playing with reading data from PubSub and writing using TextIO.
>>> > >>>
>>> > >>> First thing is that it is very hard to get any output - a lot of
>>> temp
>>> > >> files
>>> > >>> written but not always would get final files created.
>>> > >>>
>>> > >>> So, I am playing with triggers etc... If I do following
>>> > >>>
>>> > >>> PCollection<String> streamData = p.apply(
>>> > >>>         PubsubIO.readStrings().fromTopic("projects/"+ PROJECT_NAME
>>> +
>>> > >>> "/topics/myTopic"));
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>
>>> > streamData.apply(Window.<String>into(FixedWindows.of(Duratio
>>> n.standardSeconds(5)))
>>> > >>>
>>> > >>>
>>> > >>
>>> > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>> ElementInPane().plusDelayOf(Duration.standardSeconds(3))))
>>> > >>>             .withAllowedLateness(Duration.ZERO)
>>> > >>>             .discardingFiredPanes())
>>> > >>>     .apply(TextIO.write().to("/tmp/abc").withWindowedWrites()
>>> > >>>     .withSuffix(".suff").withNumShards(10));
>>> > >>>
>>> > >>>     p.run();
>>> > >>>
>>> > >>> I would expect to see some files in /tmp/ with final results..
>>> unless I
>>> > >> add
>>> > >>> good triggers I usually do not get any data.. only temp files in
>>> > >>> /temp/.beam/
>>> > >>>
>>> > >>> but sometimes when data should be written I get following exception
>>> > >>>
>>> > >>> Exception in thread "main"
>>> > >>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> > >>> java.lang.UnsupportedOperationException: There is no default
>>> policy for
>>> > >>> windowed file output. Please provide an explicit FilenamePolicy to
>>> > >> generate
>>> > >>> filenames.
>>> > >>> at
>>> > >>>
>>> > >>
>>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>> sult.waitUntilFinish(DirectRunner.java:322)
>>> > >>> at
>>> > >>>
>>> > >>
>>> > org.apache.beam.runners.direct.DirectRunner$DirectPipelineRe
>>> sult.waitUntilFinish(DirectRunner.java:292)
>>> > >>> at
>>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>>> > >>> at
>>> > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
>>> > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
>>> > >>> at Test.main(Test.java:50)
>>> > >>>
>>> > >>>
>>> > >>> Would it make sense to change TextIO so that it does not
>>> > >>> use DefaultFilenamePolicy only - but in case there are
>>> windowedWrites
>>> > and
>>> > >>> no filename policy was specified by user it could actually use
>>> custom
>>> > >>> FilePerWindow policy automatically. I believe today TextIO always
>>> > expects
>>> > >>> user to specify FilenamePolicy, right?
>>> > >>>
>>> > >>> Or maybe to have FilePerWindow policy exposed as part of Beam - I
>>> > believe
>>> > >>> today there are only implementations in tests and examples but
>>> nothing
>>> > >>> publicly visible, right?
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>> thanks
>>> > >>>
>>> > >>
>>> > >> --
>>> > >> Jean-Baptiste Onofré
>>> > >> [email protected]
>>> > >> http://blog.nanthrax.net
>>> > >> Talend - http://www.talend.com
>>> > >>
>>> > >
>>> >
>>> > --
>>> > Jean-Baptiste Onofré
>>> > [email protected]
>>> > http://blog.nanthrax.net
>>> > Talend - http://www.talend.com
>>> >
>>>
>>
>>

Reply via email to