On 2017-06-06 13:03 (+0530), Jean-Baptiste Onofré <[email protected]> wrote: 
> Hi Vikas,
> 
> I think the problem is the window/triggering you are using as you are 
> consuming 
> from an unbounded source (the kafka topic).
> 
> Do you use a FixedWindow ?
> 
> Maybe, just to test if it works, you can try something like:
> 
>  
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>  
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
>                          .withAllowedLateness(Duration.ZERO)
>                          .discardingFiredPanes()
>                  )
> 
> just to see if HDFS is working.
> 
> I guess, you are also using HADOOP_CONF_DIR (or similar) to define the 
> location 
> of the hdfs-site.xml ?
> 
> Regards
> JB
> 
> On 06/06/2017 09:27 AM, Vikas Gupta wrote:
> > We are facing an issue in executing the beam streaming pipeline as 
> > sparkRunner. This simple word count pipeline listens to kafka topic as 
> > source process the word counts and write to HDFS. Pipeline is evaluated 
> > till processing which we had verified by adding the log statement on spark 
> > driver executor.
> > 
> > Here is the code to write to HDFS
> > options.setNumShards(10);
> >             wordCounts.apply(MapElements.via(new 
> > WordCount.FormatAsTextFn()))
> >                             .apply("WriteCounts",
> >                                             
> > TextIO.write().to(output).withWindowedWrites()
> >                                                             
> > .withFilenamePolicy(new PerWindowFiles("out1"))
> >                                                             
> > .withNumShards(1));
> > 
> > Also tried below approach
> >   PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new 
> > WordCount.CountWords());
> >   
> >      wordCounts
> >          .apply(MapElements.via(new WordCount.FormatAsTextFn()))
> >          .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
> > 
> > None of these approaches worked.
> > 
> > We tried another pipeline which read the data from HDFS test file, count 
> > the words and write to HDFS as source. This worked fine.
> > 
> > We are using cloudera sandbox 5.10.
> > 
> 
> -- 
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

We checked earlier HDFS working fine because we are able to run the beam 
pipeline to count the words where input is in HDFS and output is also stored in 
HDFS.

Problem which we are facing is that messages are read from kafka, it is 
processed we saw it in executor logs. It is just output is not getting saved in 
HDFS for streams.

I modified the code as suggested but still the same problem. Windowed output is 
not saved into HDFS.

Thanks
Vikas Gupta

Reply via email to