[
https://issues.apache.org/jira/browse/BEAM-638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999835#comment-15999835
]
Asmir Avdicevic commented on BEAM-638:
--------------------------------------
Sure here's the majority of it:
{code:title=MyPipeline.java|borderStyle=solid}
PCollection<PubsubMessage> messageStream =
p.apply(PubsubIO.readPubsubMessagesWithAttributes().fromTopic(pubsub_topic).withTimestampAttribute("timestamp"));
PCollection<String> stringMessageStream = messageStream.apply(ParDo.of(new
DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Got message: " + new
String(c.element().getPayload()));
c.outputWithTimestamp(new String(c.element().getPayload()),
c.timestamp());
}
}));
PCollection<String> windowedRawMessages =
stringMessageStream.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(options.getRawMessageWindowSize()))).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).discardingFiredPanes().withAllowedLateness(Duration.standardMinutes(1)));
PDone written = windowedRawMessages.apply(new
WriteOneFilePerWindow("rawpawdata-" + options.getEnvironment() + "-raw"));
{code}
> Add sink transform to write bounded data per window, pane, [and key] even
> when PCollection is unbounded
> -------------------------------------------------------------------------------------------------------
>
> Key: BEAM-638
> URL: https://issues.apache.org/jira/browse/BEAM-638
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Jean-Baptiste Onofré
> Assignee: Davor Bonaci
>
> Today, if the pipeline source is unbounded, and the sink expects a bounded
> collection, there's no way to use a single pipeline. Even a window creates a
> chunk on the unbounded PCollection, but the "sub" PCollection is still
> unbounded.
> It would be helpful for users to have a Window function that create a bounded
> PCollection (on the window) from an unbounded PCollection coming from the
> source.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)