[
https://issues.apache.org/jira/browse/BEAM-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chloe Thonin updated BEAM-6970:
-------------------------------
Description:
I want to set up a dataflow Pipeline in Streaming mode.
My dataflow do theses tasks :
* Read messages from pubsub
* Build my "Document" object
* Insert this Document into BigQuery
* Store the initial message into Google Cloud Storage
The code successfully build and run, but it takes a lot of time for some
messages.
I think it takes a lot of time because it treats pusub message one by one :
-> I build a PCollection<Documents> with messages from pubsub.
{code:java}
PCollection<Documents> rawtickets = pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
.apply("Windowing",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1000))
).withAllowedLateness(Duration.standardSeconds(300))
.discardingFiredPanes()
)
.apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
String rawXMl = c.element();
/** code for build my object "Document" **/
Documents docTest = new Documents();
docTest.init(rawXMl);
c.output(docTest);
}
}
))
.setCoder(AvroCoder.of(Documents.class));
{code}
Here a picture of my complete process :
[Complete Process |https://zupimages.net/up/19/14/w4df.png]
We can see the latency of the process after reading messages.
Concretely, I search to create a Pcollection of N elements. I have tried
methods founds here :
[https://beam.apache.org/documentation/programming-guide/#triggers] but it
doesn't group my pubsub messages.
How I can batch this process ?
EDIT : I think I must use "GroupByKey" after a window, but that return an
error :
{code:java}
PCollection<Documents> rawtickets = pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
.apply("Windowing",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1000))
).withAllowedLateness(Duration.standardSeconds(300))
.discardingFiredPanes()
)
.apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
String rawXMl = c.element();
/** code for build my object "Document" **/
Documents docTest = new Documents();
docTest.init(rawXMl);
c.output(docTest);
}
}
))
.setCoder(AvroCoder.of(Documents.class))
.apply("GroupByKey",GroupByKey.<Integer,Documents>create());
{code}
Error : Wrong 2nd argument type. Found:
'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>',
required: 'org.apache.beam.sdk.transforms.PTransform<? super
org.apache.beam.sdk.values.PCollection<Documents>,OutputT>' less... Inspection
info: apply (String, org.apache.beam.sdk.transforms.PTransform<? super
org.apache.beam.sdk.values.PCollection<Documents>,OutputT>) in PCollection
cannot be applied to (String,
org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>)
was:
I want to set up a dataflow Pipeline in Streaming mode.
My dataflow do theses tasks :
* Read messages from pubsub
* Build my "Document" object
* Insert this Document into BigQuery
* Store the initial message into Google Cloud Storage
The code successfully build and run, but it takes a lot of time for some
messages.
I think it takes a lot of time because it treats pusub message one by one :
-> I build a PCollection<Documents> with messages from pubsub.
{code:java}
PCollection<Documents> rawtickets = pipeline
.apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
.apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
@ProcessElement
public void processElement(ProcessContext c) throws
Exception {
String rawXMl = c.element();
/** code for build my object "Document" **/
c.output(rawXml);
}
}
))
.setCoder(AvroCoder.of(Documents.class));
{code}
Here a picture of my complete process :
[Complete Process |https://zupimages.net/up/19/14/w4df.png]
We can see the latency of the process after reading messages.
Concretely, I search to create a Pcollection of N elements. I have tried
methods founds here :
[https://beam.apache.org/documentation/programming-guide/#triggers] but it
doesn't group my pubsub messages.
How I can batch this process ?
> Java SDK : How to bounded messages from PubSub ?
> -------------------------------------------------
>
> Key: BEAM-6970
> URL: https://issues.apache.org/jira/browse/BEAM-6970
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, sdk-java-core
> Affects Versions: 2.11.0
> Environment: Beam worker : Dataflow (GCP)
> Reporter: Chloe Thonin
> Priority: Minor
> Attachments: Capture du 2019-04-02 16-55-37.png, processWithWindow.png
>
>
> I want to set up a dataflow Pipeline in Streaming mode.
> My dataflow do theses tasks :
> * Read messages from pubsub
> * Build my "Document" object
> * Insert this Document into BigQuery
> * Store the initial message into Google Cloud Storage
> The code successfully build and run, but it takes a lot of time for some
> messages.
> I think it takes a lot of time because it treats pusub message one by one :
> -> I build a PCollection<Documents> with messages from pubsub.
> {code:java}
> PCollection<Documents> rawtickets = pipeline
> .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
> .apply("Windowing",
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
> .triggering(Repeatedly.forever(
> AfterPane.elementCountAtLeast(1000))
> ).withAllowedLateness(Duration.standardSeconds(300))
> .discardingFiredPanes()
> )
> .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
> @ProcessElement
> public void processElement(ProcessContext c) throws
> Exception {
> String rawXMl = c.element();
> /** code for build my object "Document"
> **/
> Documents docTest = new Documents();
> docTest.init(rawXMl);
> c.output(docTest);
> }
> }
> ))
> .setCoder(AvroCoder.of(Documents.class));
> {code}
>
> Here a picture of my complete process :
> [Complete Process |https://zupimages.net/up/19/14/w4df.png]
> We can see the latency of the process after reading messages.
> Concretely, I search to create a Pcollection of N elements. I have tried
> methods founds here :
> [https://beam.apache.org/documentation/programming-guide/#triggers] but it
> doesn't group my pubsub messages.
> How I can batch this process ?
>
> EDIT : I think I must use "GroupByKey" after a window, but that return an
> error :
> {code:java}
> PCollection<Documents> rawtickets = pipeline
> .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
> .apply("Windowing",
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
> .triggering(Repeatedly.forever(
> AfterPane.elementCountAtLeast(1000))
> ).withAllowedLateness(Duration.standardSeconds(300))
> .discardingFiredPanes()
> )
> .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
> @ProcessElement
> public void processElement(ProcessContext c) throws
> Exception {
> String rawXMl = c.element();
> /** code for build my object "Document"
> **/
> Documents docTest = new Documents();
> docTest.init(rawXMl);
> c.output(docTest);
> }
> }
> ))
> .setCoder(AvroCoder.of(Documents.class))
> .apply("GroupByKey",GroupByKey.<Integer,Documents>create());
> {code}
> Error : Wrong 2nd argument type. Found:
> 'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>',
> required: 'org.apache.beam.sdk.transforms.PTransform<? super
> org.apache.beam.sdk.values.PCollection<Documents>,OutputT>' less...
> Inspection info: apply (String, org.apache.beam.sdk.transforms.PTransform<?
> super org.apache.beam.sdk.values.PCollection<Documents>,OutputT>) in
> PCollection cannot be applied to (String,
> org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)