Charan,

After reading the code more carefully, I think your observation is correct.
That is an issue when exceptions are thrown on flushing.

Can you file a JIRA for this? I think the fix should be straightforward by
catching this exception and adding the unflushed logs back to the list.

On Fri, Apr 21, 2017 at 5:22 PM, Sijie Guo <[email protected]> wrote:

> Charan, will check later today
>
> On Apr 20, 2017 10:55 PM, "Charan Reddy G" <[email protected]>
> wrote:
>
>> Hey,
>>
>> SyncThread.checkpoint(Checkpoint checkpoint) (which is called
>> periodically
>> by SyncThread's executor for every flushInterval) ultimately calls
>> EntryLogger.flushRotatedLogs.
>>
>> In EntryLogger.flushRotatedLogs, first we set 'logChannelsToFlush' to null
>> and then we try to flush and close individual file. Now, if IOException
>> happens while trying to flush/close the logchannel, then exception is
>> thrown as it is and it get propagates back upto SyncThread.checkpoint.
>> Here
>> we catch that IOException, log it and return without calling the
>> checkpointComplete. But by now we lost reference of 'logChannelsToFlush'
>> (rolled logs which are yet to be closed), because it is set to null before
>> we try to flush/close individually rolledlogs. The next execution of
>> 'checkpoint' (after flushinterval) wouldn't be knowing about the
>> rolledlogs
>> it failed to flush/close the previous time and it would flush the newly
>> rolledlogs. So the failure of flush/close of the previous rolledlogs goes
>> unnoticed completely.
>>
>> in EntryLogger.java
>>         void flushRotatedLogs() throws IOException {
>>         List<BufferedLogChannel> channels = null;
>>         long flushedLogId = INVALID_LID;
>>         synchronized (this) {
>>             channels = logChannelsToFlush;
>>             logChannelsToFlush = null;               <--------- here we
>> set
>> 'logChannelsToFlush' to null before it tries to flush/close rolledlogs
>>         }
>>         if (null == channels) {
>>             return;
>>         }
>>         for (BufferedLogChannel channel : channels) {
>>             channel.flush(true);
>>  <------------IOEXception can happen here or in the following
>> closeFileChannel call
>>             // since this channel is only used for writing, after flushing
>> the channel,
>>             // we had to close the underlying file channel. Otherwise, we
>> might end up
>>             // leaking fds which cause the disk spaces could not be
>> reclaimed.
>>             closeFileChannel(channel);
>>             if (channel.getLogId() > flushedLogId) {
>>                 flushedLogId = channel.getLogId();
>>             }
>>             LOG.info("Synced entry logger {} to disk.",
>> channel.getLogId());
>>         }
>>         // move the leastUnflushedLogId ptr
>>         leastUnflushedLogId = flushedLogId + 1;
>>     }
>>
>> in SyncThread.java
>>     public void checkpoint(Checkpoint checkpoint) {
>>         try {
>>             checkpoint = ledgerStorage.checkpoint(checkpoint);
>>         } catch (NoWritableLedgerDirException e) {
>>             LOG.error("No writeable ledger directories", e);
>>             dirsListener.allDisksFull();
>>             return;
>>         } catch (IOException e) {
>>             LOG.error("Exception flushing ledgers", e); <-----that IOExc
>> gets propagated to this method and here it is caught and not dealt
>> appropriately
>>             return;
>>         }
>>
>>         try {
>>             checkpointSource.checkpointComplete(checkpoint, true);
>>         } catch (IOException e) {
>>             LOG.error("Exception marking checkpoint as complete", e);
>>             dirsListener.allDisksFull();
>>         }
>>     }
>>
>> Thanks,
>> Charan
>>
>

Reply via email to