We are facing an issue in executing the beam streaming pipeline as sparkRunner.
This simple word count pipeline listens to kafka topic as source process the
word counts and write to HDFS. Pipeline is evaluated till processing which we
had verified by adding the log statement on spark driver executor.
Here is the code to write to HDFS
options.setNumShards(10);
wordCounts.apply(MapElements.via(new
WordCount.FormatAsTextFn()))
.apply("WriteCounts",
TextIO.write().to(output).withWindowedWrites()
.withFilenamePolicy(new PerWindowFiles("out1"))
.withNumShards(1));
Also tried below approach
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new
WordCount.CountWords());
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
None of these approaches worked.
We tried another pipeline which read the data from HDFS test file, count the
words and write to HDFS as source. This worked fine.
We are using cloudera sandbox 5.10.