This is correct. Since you didn't specify any windowing, data will only be
written at the end of the global window - which will never be reached.

If you want to stay in the GlobalWindow, you can simply specify a trigger
to determine how often to write the files.

On Tue, Jun 6, 2017 at 1:04 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Yes, I think because there's no trigger. You could at a DoFn (with time)
> to check the watermark. I think the watermark is not yet reach, so the
> trigger doesn't happen.
>
> So, I don't think the problem is on HDFS, it's because, on the
> window/trigger.
>
> Regards
> JB
>
> On 06/06/2017 10:00 AM, Vikas Gupta wrote:
>
>>
>>
>> On 2017-06-06 13:03 (+0530), Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>>> Hi Vikas,
>>>
>>> I think the problem is the window/triggering you are using as you are
>>> consuming
>>> from an unbounded source (the kafka topic).
>>>
>>> Do you use a FixedWindow ?
>>>
>>> Maybe, just to test if it works, you can try something like:
>>>
>>>   .apply(Window.<String>into(FixedWindows.of(Duration.standard
>>> Seconds(10)))
>>>   .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst
>>> ElementInPane().plusDelayOf(Duration.standardSeconds(10))))
>>>                           .withAllowedLateness(Duration.ZERO)
>>>                           .discardingFiredPanes()
>>>                   )
>>>
>>> just to see if HDFS is working.
>>>
>>> I guess, you are also using HADOOP_CONF_DIR (or similar) to define the
>>> location
>>> of the hdfs-site.xml ?
>>>
>>> Regards
>>> JB
>>>
>>> On 06/06/2017 09:27 AM, Vikas Gupta wrote:
>>>
>>>> We are facing an issue in executing the beam streaming pipeline as
>>>> sparkRunner. This simple word count pipeline listens to kafka topic as
>>>> source process the word counts and write to HDFS. Pipeline is evaluated
>>>> till processing which we had verified by adding the log statement on spark
>>>> driver executor.
>>>>
>>>> Here is the code to write to HDFS
>>>> options.setNumShards(10);
>>>>                 wordCounts.apply(MapElements.via(new
>>>> WordCount.FormatAsTextFn()))
>>>>                                 .apply("WriteCounts",
>>>>
>>>> TextIO.write().to(output).withWindowedWrites()
>>>>
>>>> .withFilenamePolicy(new PerWindowFiles("out1"))
>>>>
>>>> .withNumShards(1));
>>>>
>>>> Also tried below approach
>>>>    PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new
>>>> WordCount.CountWords());
>>>>          wordCounts
>>>>           .apply(MapElements.via(new WordCount.FormatAsTextFn()))
>>>>           .apply(new WriteOneFilePerWindow(output,
>>>> options.getNumShards()));
>>>>
>>>> None of these approaches worked.
>>>>
>>>> We tried another pipeline which read the data from HDFS test file,
>>>> count the words and write to HDFS as source. This worked fine.
>>>>
>>>> We are using cloudera sandbox 5.10.
>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>> We checked earlier HDFS working fine because we are able to run the beam
>> pipeline to count the words where input is in HDFS and output is also
>> stored in HDFS.
>>
>> Problem which we are facing is that messages are read from kafka, it is
>> processed we saw it in executor logs. It is just output is not getting
>> saved in HDFS for streams.
>>
>> I modified the code as suggested but still the same problem. Windowed
>> output is not saved into HDFS.
>>
>> Thanks
>> Vikas Gupta
>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to