Charan, will check later today

On Apr 20, 2017 10:55 PM, "Charan Reddy G" <[email protected]> wrote:

> Hey,
>
> SyncThread.checkpoint(Checkpoint checkpoint) (which is called periodically
> by SyncThread's executor for every flushInterval) ultimately calls
> EntryLogger.flushRotatedLogs.
>
> In EntryLogger.flushRotatedLogs, first we set 'logChannelsToFlush' to null
> and then we try to flush and close individual file. Now, if IOException
> happens while trying to flush/close the logchannel, then exception is
> thrown as it is and it get propagates back upto SyncThread.checkpoint. Here
> we catch that IOException, log it and return without calling the
> checkpointComplete. But by now we lost reference of 'logChannelsToFlush'
> (rolled logs which are yet to be closed), because it is set to null before
> we try to flush/close individually rolledlogs. The next execution of
> 'checkpoint' (after flushinterval) wouldn't be knowing about the rolledlogs
> it failed to flush/close the previous time and it would flush the newly
> rolledlogs. So the failure of flush/close of the previous rolledlogs goes
> unnoticed completely.
>
> in EntryLogger.java
>         void flushRotatedLogs() throws IOException {
>         List<BufferedLogChannel> channels = null;
>         long flushedLogId = INVALID_LID;
>         synchronized (this) {
>             channels = logChannelsToFlush;
>             logChannelsToFlush = null;               <--------- here we set
> 'logChannelsToFlush' to null before it tries to flush/close rolledlogs
>         }
>         if (null == channels) {
>             return;
>         }
>         for (BufferedLogChannel channel : channels) {
>             channel.flush(true);
>  <------------IOEXception can happen here or in the following
> closeFileChannel call
>             // since this channel is only used for writing, after flushing
> the channel,
>             // we had to close the underlying file channel. Otherwise, we
> might end up
>             // leaking fds which cause the disk spaces could not be
> reclaimed.
>             closeFileChannel(channel);
>             if (channel.getLogId() > flushedLogId) {
>                 flushedLogId = channel.getLogId();
>             }
>             LOG.info("Synced entry logger {} to disk.",
> channel.getLogId());
>         }
>         // move the leastUnflushedLogId ptr
>         leastUnflushedLogId = flushedLogId + 1;
>     }
>
> in SyncThread.java
>     public void checkpoint(Checkpoint checkpoint) {
>         try {
>             checkpoint = ledgerStorage.checkpoint(checkpoint);
>         } catch (NoWritableLedgerDirException e) {
>             LOG.error("No writeable ledger directories", e);
>             dirsListener.allDisksFull();
>             return;
>         } catch (IOException e) {
>             LOG.error("Exception flushing ledgers", e); <-----that IOExc
> gets propagated to this method and here it is caught and not dealt
> appropriately
>             return;
>         }
>
>         try {
>             checkpointSource.checkpointComplete(checkpoint, true);
>         } catch (IOException e) {
>             LOG.error("Exception marking checkpoint as complete", e);
>             dirsListener.allDisksFull();
>         }
>     }
>
> Thanks,
> Charan
>

Reply via email to