Yes, I think because there's no trigger. You could at a DoFn (with time) to
check the watermark. I think the watermark is not yet reach, so the trigger
doesn't happen.
So, I don't think the problem is on HDFS, it's because, on the window/trigger.
Regards
JB
On 06/06/2017 10:00 AM, Vikas Gupta wrote:
On 2017-06-06 13:03 (+0530), Jean-Baptiste Onofré <[email protected]> wrote:
Hi Vikas,
I think the problem is the window/triggering you are using as you are consuming
from an unbounded source (the kafka topic).
Do you use a FixedWindow ?
Maybe, just to test if it works, you can try something like:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
just to see if HDFS is working.
I guess, you are also using HADOOP_CONF_DIR (or similar) to define the location
of the hdfs-site.xml ?
Regards
JB
On 06/06/2017 09:27 AM, Vikas Gupta wrote:
We are facing an issue in executing the beam streaming pipeline as sparkRunner.
This simple word count pipeline listens to kafka topic as source process the
word counts and write to HDFS. Pipeline is evaluated till processing which we
had verified by adding the log statement on spark driver executor.
Here is the code to write to HDFS
options.setNumShards(10);
wordCounts.apply(MapElements.via(new
WordCount.FormatAsTextFn()))
.apply("WriteCounts",
TextIO.write().to(output).withWindowedWrites()
.withFilenamePolicy(new
PerWindowFiles("out1"))
.withNumShards(1));
Also tried below approach
PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new
WordCount.CountWords());
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(new WriteOneFilePerWindow(output, options.getNumShards()));
None of these approaches worked.
We tried another pipeline which read the data from HDFS test file, count the
words and write to HDFS as source. This worked fine.
We are using cloudera sandbox 5.10.
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
We checked earlier HDFS working fine because we are able to run the beam
pipeline to count the words where input is in HDFS and output is also stored in
HDFS.
Problem which we are facing is that messages are read from kafka, it is
processed we saw it in executor logs. It is just output is not getting saved in
HDFS for streams.
I modified the code as suggested but still the same problem. Windowed output is
not saved into HDFS.
Thanks
Vikas Gupta
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com