Hi Jagadish, I resolved the problem. Samza must close the FileSystem somewhere when “shutdown” is triggered. Now I don’t get FileSystem instance from cache and everything works fine.
Thanks! ———————— Qi Shu > 在 2017年8月24日,15:06,舒琦 <sh...@eefung.com> 写道: > > Yes, in the same thread. > > File list is not empty, because after samza job shutdown, all files in > writing are not in complete state. > > ———————— > Qi Shu > >> 在 2017年8月24日,15:00,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: >> >>>> Now the problem is not program hang, but the container quit >> before the “closeFiles” method executed completely. >> >> It's unlikely that the container quit without returning from *close*. Are >> you sure there were filesList is not empty? I'd suggest adding more logging. >> >> I'm assuming you are calling *closeFiles* in the same thread? >> >> On Wed, Aug 23, 2017 at 11:53 PM, 舒琦 <sh...@eefung.com> wrote: >> >>> Hi Jagadish, >>> >>> When samza job is shutdown, “Begin to stop” is printed out in the >>> log file, but not the log in closeFiles. >>> Now the problem is not program hang, but the container quit before >>> the “closeFiles” method executed completely. >>> >>> Thanks. >>> >>> ———————— >>> Qi Shu >>> >>>> 在 2017年8月24日,14:44,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: >>>> >>>> Is "Begin to close file" printed? Where exactly is your application >>> stalled >>>> ? I'd suggest you take a stack dump. >>>> >>>> On Wed, Aug 23, 2017 at 11:32 PM, 舒琦 <sh...@eefung.com> wrote: >>>> >>>>> Hi Jagadish, >>>>> >>>>> Below is part of the related code, log “"Begin to stop” is >>> printed >>>>> out. >>>>> >>>>> Thank you! >>>>> >>>>> public abstract class MyProducer implements SystemProducer { >>>>> >>>>> @Override >>>>> public void stop() { >>>>> LOGGER.info("Begin to stop"); >>>>> >>>>> closeFiles(); >>>>> >>>>> LOGGER.info("End to stop"); >>>>> } >>>>> >>>>> @Override >>>>> protected void closeFiles() throws IOException { >>>>> closeFiles(statusFiles); >>>>> closeFiles(interactionFiles); >>>>> } >>>>> >>>>> private void closeFiles(Map<String, FileInWriting> files) throws >>>>> IOException { >>>>> for (FileInWriting statusFile : files.values()) { >>>>> LOGGER.info("Begin to close file: {}", >>>>> statusFile.getFilePath()); >>>>> statusFile.getWriter().hflush(); >>>>> statusFile.getWriter().close(); >>>>> renameFile(statusFile.getFilePath()); >>>>> LOGGER.info("Successfully close file: {}", >>>>> statusFile.getFilePath()); >>>>> } >>>>> >>>>> files.clear(); >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> ———————— >>>>> Qi Shu >>>>> >>>>>> 在 2017年8月24日,14:21,Jagadish Venkatraman <jagadish1...@gmail.com> 写道: >>>>>> >>>>>> Hi Qi, >>>>>> >>>>>>>> the stop method in SystemProducer is called, but the close files >>>>>> operation(may need some time, there may be cache data to be flushed) in >>>>>> stop method is not executed completely >>>>>> >>>>>> Are you seeing the *close()* method hang? SystemProducer.*close* is a >>>>>> synchronous operation, and will block for the *close* method to finish. >>>>>> >>>>>> Best, >>>>>> Jagadish >>>>>> >>>>>> On Wed, Aug 23, 2017 at 11:17 PM, 舒琦 <sh...@eefung.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I write a SystemProducer for HDFS and everything is fine. When >>>>>>> samza job is shutdown, the stop method in SystemProducer is called, >>> but >>>>> the >>>>>>> close files operation(may need some time, there may be cache data to >>> be >>>>>>> flushed) in stop method is not executed completely. >>>>>>> >>>>>>> How can I resolve this problem? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> ———————— >>>>>>> Qi Shu >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jagadish V, >>>>>> Graduate Student, >>>>>> Department of Computer Science, >>>>>> Stanford University >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jagadish V, >>>> Graduate Student, >>>> Department of Computer Science, >>>> Stanford University >>> >>> >> >> >> -- >> Jagadish V, >> Graduate Student, >> Department of Computer Science, >> Stanford University >