We are facing an issue in executing the beam streaming pipeline as sparkRunner. 
This simple word count pipeline listens to kafka topic as source process the 
word counts and write to HDFS. Pipeline is evaluated till processing which we 
had verified by adding the log statement on spark driver executor.

Here is the code to write to HDFS
options.setNumShards(10);
                wordCounts.apply(MapElements.via(new 
WordCount.FormatAsTextFn()))
                                .apply("WriteCounts",
                                                
TextIO.write().to(output).withWindowedWrites()
                                                                
.withFilenamePolicy(new PerWindowFiles("out1"))
                                                                
.withNumShards(1));

Also tried below approach
 PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new 
WordCount.CountWords());
 
    wordCounts
        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
        .apply(new WriteOneFilePerWindow(output, options.getNumShards()));

None of these approaches worked.

We tried another pipeline which read the data from HDFS test file, count the 
words and write to HDFS as source. This worked fine.

We are using cloudera sandbox 5.10.

Reply via email to