Hi Vikas,
I think the problem is the window/triggering you are using as you are consuming
from an unbounded source (the kafka topic).
Do you use a FixedWindow ?
Maybe, just to test if it works, you can try something like:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
just to see if HDFS is working.
I guess, you are also using HADOOP_CONF_DIR (or similar) to define the location
of the hdfs-site.xml ?
Regards
JB
On 06/06/2017 09:27 AM, Vikas Gupta wrote:
We are facing an issue in executing the beam streaming pipeline as sparkRunner.
This simple word count pipeline listens to kafka topic as source process the
word counts and write to HDFS. Pipeline is evaluated till processing which we
had verified by adding the log statement on spark driver executor.
Here is the code to write to HDFS
options.setNumShards(10);
wordCounts.apply(MapElements.via(new
WordCount.FormatAsTextFn()))
.apply("WriteCounts",
TextIO.write().to(output).withWindowedWrites()
.withFilenamePolicy(new
PerWindowFiles("out1"))
.withNumShards(1));
Also tried below approach
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new
WordCount.CountWords());
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
None of these approaches worked.
We tried another pipeline which read the data from HDFS test file, count the
words and write to HDFS as source. This worked fine.
We are using cloudera sandbox 5.10.
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com