[ 
https://issues.apache.org/jira/browse/BEAM-638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999835#comment-15999835
 ] 

Asmir Avdicevic commented on BEAM-638:
--------------------------------------

Sure here's the majority of it:

{code:title=MyPipeline.java|borderStyle=solid}
PCollection<PubsubMessage> messageStream = 
p.apply(PubsubIO.readPubsubMessagesWithAttributes().fromTopic(pubsub_topic).withTimestampAttribute("timestamp"));

PCollection<String> stringMessageStream = messageStream.apply(ParDo.of(new 
DoFn<PubsubMessage, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        System.out.println("Got message: " + new 
String(c.element().getPayload()));
        c.outputWithTimestamp(new String(c.element().getPayload()), 
c.timestamp());
    }
}));

PCollection<String> windowedRawMessages = 
stringMessageStream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(options.getRawMessageWindowSize()))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).discardingFiredPanes().withAllowedLateness(Duration.standardMinutes(1)));

PDone written = windowedRawMessages.apply(new 
WriteOneFilePerWindow("rawpawdata-" + options.getEnvironment() + "-raw"));

{code}

> Add sink transform to write bounded data per window, pane, [and key] even 
> when PCollection is unbounded
> -------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-638
>                 URL: https://issues.apache.org/jira/browse/BEAM-638
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Davor Bonaci
>
> Today, if the pipeline source is unbounded, and the sink expects a bounded 
> collection, there's no way to use a single pipeline. Even a window creates a 
> chunk on the unbounded PCollection, but the "sub" PCollection is still 
> unbounded.
> It would be helpful for users to have a Window function that create a bounded 
> PCollection (on the window) from an unbounded PCollection coming from the 
> source.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to