kennknowles opened a new issue, #19485:
URL: https://github.com/apache/beam/issues/19485

   I want to set up a dataflow Pipeline in Streaming mode.
   
   My dataflow do theses tasks :
    * Read messages from pubsub
    * Build my "Document" object
    * Insert this Document into BigQuery
    * Store the initial message into Google Cloud Storage
   
   The code successfully build and run, but it takes a lot of time for some 
messages.
   
   I think it takes a lot of time because it treats pusub message one by one :
   
    -\>  I build a PCollection<Documents\> with messages from pubsub.
   ```
   
   PCollection<Documents> rawtickets = pipeline
           .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
   
          .apply("Windowing", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        
     .triggering(Repeatedly.forever(
                   AfterPane.elementCountAtLeast(1000))
           ).withAllowedLateness(Duration.standardSeconds(300))
   
          .discardingFiredPanes()
           )
           .apply("Make Document ", ParDo.of(new DoFn<String,
   Documents>() {
                      @ProcessElement    
                      public void processElement(ProcessContext
   c) throws Exception {
                                        String rawXMl = c.element();
            
                              /** code for build my object "Document" **/
                              
           Documents docTest = new Documents();
                                       docTest.init(rawXMl);
   
   
                                       c.output(docTest);
                                        }
      
                                }
           ))
           .setCoder(AvroCoder.of(Documents.class));
   
   
   ```
   
    
   
   Here a picture of my complete process :
   
   [Complete Process ](https://zupimages.net/up/19/14/w4df.png)
   
   We can see the latency of the process after reading messages.
   
   Concretely, I search to create a Pcollection of N elements. I have tried 
methods founds here : 
[https://beam.apache.org/documentation/programming-guide/#triggers](https://beam.apache.org/documentation/programming-guide/#triggers)
 but it doesn't group  my pubsub messages.
   
   How I can batch this process  ?
   
    
   
   EDIT : I think I must use "GroupByKey"  after a window, but that return an 
error :
   ```
   
   PCollection<Documents> rawtickets = pipeline
           .apply("Read PubSub Events", PubsubIO.readStrings().fromTopic(TOPIC))
   
          .apply("Windowing", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))
        
             .triggering(Repeatedly.forever(
                           AfterPane.elementCountAtLeast(1000))
   
                  ).withAllowedLateness(Duration.standardSeconds(300))
                   .discardingFiredPanes()
   
             )
          .apply("Make Document ", ParDo.of(new DoFn<String, Documents>() {
               
         @ProcessElement    
                      public void processElement(ProcessContext c) throws 
Exception
   {
                                        String rawXMl = c.element();
                                
          /** code for build my object "Document" **/
                                       Documents docTest
   = new Documents();
                                       docTest.init(rawXMl);
   
                     
                     c.output(docTest);
                                        }
                        
              }
           ))
           .setCoder(AvroCoder.of(Documents.class))
           .apply("GroupByKey",GroupByKey.<Integer,Documents>create());
   
   ```
   
   Error : Wrong 2nd argument type. Found: 
'org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents\>', 
required: 'org.apache.beam.sdk.transforms.PTransform<? super 
org.apache.beam.sdk.values.PCollection<Documents\>,OutputT\>' less... 
Inspection info: apply (String, org.apache.beam.sdk.transforms.PTransform<? 
super org.apache.beam.sdk.values.PCollection<Documents\>,OutputT\>) in 
PCollection cannot be applied to (String, 
org.apache.beam.sdk.transforms.GroupByKey<java.lang.Integer,Documents\>)  
   
    
   
    
   
   Imported from Jira 
[BEAM-6970](https://issues.apache.org/jira/browse/BEAM-6970). Original Jira may 
contain additional context.
   Reported by: chloethonin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to