[ 
https://issues.apache.org/jira/browse/BEAM-6970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chloe Thonin updated BEAM-6970:
-------------------------------
    Description: 
I want to set up a dataflow Pipeline in Streaming mode.

My dataflow do theses tasks :
 * Read messages from pubsub
 * Build my "Document" object
 * Insert this Document into BigQuery
 * Store the initial message into Google Cloud Storage

The code successfully build and run, but it takes a lot of time for some 
messages.

I think it takes a lot of time because it treats pusub message one by one :

 ->  I build a PCollection<Documents> with messages from pubsub.
{code:java}
PCollection<Documents> rawtickets = pipeline
        .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
        .apply("Windowing", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        .triggering(Repeatedly.forever(
                AfterPane.elementCountAtLeast(1000))
        ).withAllowedLateness(Duration.standardSeconds(300))
        .discardingFiredPanes()
        )
        .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
                   @ProcessElement    
                   public void processElement(ProcessContext c) throws 
Exception {
                                     String rawXMl = c.element();
                                     /** code for build my object "Document" **/
                                    Documents docTest = new Documents();
                                    docTest.init(rawXMl);

                                     c.output(docTest);
                                     }
                                 }
        ))
        .setCoder(AvroCoder.of(Documents.class));

{code}
 

Here a picture of my complete process :

[Complete Process |https://zupimages.net/up/19/14/w4df.png]

We can see the latency of the process after reading messages.

Concretely, I search to create a Pcollection of N elements. I have tried 
methods founds here : 
[https://beam.apache.org/documentation/programming-guide/#triggers] but it 
doesn't group  my pubsub messages.

How I can batch this process  ?

 

EDIT : I think I must use "GroupByKey"  after a window, but that return an 
error :
{code:java}
PCollection<Documents> rawtickets = pipeline
        .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
        .apply("Windowing", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
                .triggering(Repeatedly.forever(
                        AfterPane.elementCountAtLeast(1000))
                ).withAllowedLateness(Duration.standardSeconds(300))
                .discardingFiredPanes()
           )
       .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
                   @ProcessElement    
                   public void processElement(ProcessContext c) throws 
Exception {
                                     String rawXMl = c.element();
                                     /** code for build my object "Document" **/
                                    Documents docTest = new Documents();
                                    docTest.init(rawXMl);

                                     c.output(docTest);
                                     }
                                 }
        ))
        .setCoder(AvroCoder.of(Documents.class))
        .apply("GroupByKey",GroupByKey.<Integer,Documents>create());
{code}
Error : Wrong 2nd argument type. Found: 
'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>', 
required: 'org.apache.beam.sdk.transforms.PTransform<? super 
org.apache.beam.sdk.values.PCollection<Documents>,OutputT>' less... Inspection 
info: apply (String, org.apache.beam.sdk.transforms.PTransform<? super 
org.apache.beam.sdk.values.PCollection<Documents>,OutputT>) in PCollection 
cannot be applied to (String, 
org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>)  

 

 

  was:
I want to set up a dataflow Pipeline in Streaming mode.

My dataflow do theses tasks :
 * Read messages from pubsub
 * Build my "Document" object
 * Insert this Document into BigQuery
 * Store the initial message into Google Cloud Storage

The code successfully build and run, but it takes a lot of time for some 
messages.

I think it takes a lot of time because it treats pusub message one by one :

 ->  I build a PCollection<Documents> with messages from pubsub.
{code:java}
PCollection<Documents> rawtickets = pipeline
        .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
        .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
                   @ProcessElement    
                   public void processElement(ProcessContext c) throws 
Exception {
                                     String rawXMl = c.element();
                                 /** code for build my object "Document" **/
                                     c.output(rawXml);
                                     }
                                 }
        ))
        .setCoder(AvroCoder.of(Documents.class));

{code}
 

Here a picture of my complete process :

[Complete Process |https://zupimages.net/up/19/14/w4df.png]

We can see the latency of the process after reading messages.

Concretely, I search to create a Pcollection of N elements. I have tried 
methods founds here : 
[https://beam.apache.org/documentation/programming-guide/#triggers] but it 
doesn't group  my pubsub messages.

How I can batch this process  ?

 

 


> Java SDK : How to bounded messages from PubSub ? 
> -------------------------------------------------
>
>                 Key: BEAM-6970
>                 URL: https://issues.apache.org/jira/browse/BEAM-6970
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, sdk-java-core
>    Affects Versions: 2.11.0
>         Environment: Beam worker : Dataflow (GCP)
>            Reporter: Chloe Thonin
>            Priority: Minor
>         Attachments: Capture du 2019-04-02 16-55-37.png, processWithWindow.png
>
>
> I want to set up a dataflow Pipeline in Streaming mode.
> My dataflow do theses tasks :
>  * Read messages from pubsub
>  * Build my "Document" object
>  * Insert this Document into BigQuery
>  * Store the initial message into Google Cloud Storage
> The code successfully build and run, but it takes a lot of time for some 
> messages.
> I think it takes a lot of time because it treats pusub message one by one :
>  ->  I build a PCollection<Documents> with messages from pubsub.
> {code:java}
> PCollection<Documents> rawtickets = pipeline
>         .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
>         .apply("Windowing", 
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
>         .triggering(Repeatedly.forever(
>                 AfterPane.elementCountAtLeast(1000))
>         ).withAllowedLateness(Duration.standardSeconds(300))
>         .discardingFiredPanes()
>         )
>         .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
>                    @ProcessElement    
>                    public void processElement(ProcessContext c) throws 
> Exception {
>                                      String rawXMl = c.element();
>                                      /** code for build my object "Document" 
> **/
>                                     Documents docTest = new Documents();
>                                     docTest.init(rawXMl);
>                                      c.output(docTest);
>                                      }
>                                  }
>         ))
>         .setCoder(AvroCoder.of(Documents.class));
> {code}
>  
> Here a picture of my complete process :
> [Complete Process |https://zupimages.net/up/19/14/w4df.png]
> We can see the latency of the process after reading messages.
> Concretely, I search to create a Pcollection of N elements. I have tried 
> methods founds here : 
> [https://beam.apache.org/documentation/programming-guide/#triggers] but it 
> doesn't group  my pubsub messages.
> How I can batch this process  ?
>  
> EDIT : I think I must use "GroupByKey"  after a window, but that return an 
> error :
> {code:java}
> PCollection<Documents> rawtickets = pipeline
>         .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
>         .apply("Windowing", 
> Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
>                 .triggering(Repeatedly.forever(
>                         AfterPane.elementCountAtLeast(1000))
>                 ).withAllowedLateness(Duration.standardSeconds(300))
>                 .discardingFiredPanes()
>            )
>        .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
>                    @ProcessElement    
>                    public void processElement(ProcessContext c) throws 
> Exception {
>                                      String rawXMl = c.element();
>                                      /** code for build my object "Document" 
> **/
>                                     Documents docTest = new Documents();
>                                     docTest.init(rawXMl);
>                                      c.output(docTest);
>                                      }
>                                  }
>         ))
>         .setCoder(AvroCoder.of(Documents.class))
>         .apply("GroupByKey",GroupByKey.<Integer,Documents>create());
> {code}
> Error : Wrong 2nd argument type. Found: 
> 'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>', 
> required: 'org.apache.beam.sdk.transforms.PTransform<? super 
> org.apache.beam.sdk.values.PCollection<Documents>,OutputT>' less... 
> Inspection info: apply (String, org.apache.beam.sdk.transforms.PTransform<? 
> super org.apache.beam.sdk.values.PCollection<Documents>,OutputT>) in 
> PCollection cannot be applied to (String, 
> org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents>)  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to