Modified: 
river/jtsk/modules/modularize/apache-river/river-services/mercury/mercury-service/src/main/java/org/apache/river/mercury/PersistentEventLog.java
URL: 
http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/mercury/mercury-service/src/main/java/org/apache/river/mercury/PersistentEventLog.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- 
river/jtsk/modules/modularize/apache-river/river-services/mercury/mercury-service/src/main/java/org/apache/river/mercury/PersistentEventLog.java
 (original)
+++ 
river/jtsk/modules/modularize/apache-river/river-services/mercury/mercury-service/src/main/java/org/apache/river/mercury/PersistentEventLog.java
 Sun Jul  5 11:41:39 2020
@@ -1,1020 +1,1025 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.mercury;
-
-import net.jini.id.Uuid;
-
-import org.apache.river.logging.Levels;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.InterruptedIOException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.NoSuchElementException;
-
-import net.jini.core.event.RemoteEvent;
-
-/**
- * Class that implements the interface for an <tt>EventLog</tt>. 
- * This class encapsulates the details of reading/writing events from/to
- * some underlying persistence mechanism.
- *
- * This class makes certain assumptions. First, the <tt>next</tt> and
- * <tt>remove</tt> methods are intended to be called in pairs. If 
- * <tt>remove</tt> is not called, then subsequent calls to <tt>next</tt> 
- * will attempt to return the same object. Calling <tt>remove</tt> 
- * essentially advances the read pointer to the next object, if any. 
- * Second, if any <tt>IOExceptions</tt> are encountered during the reading
- * or writing of an event the associated read/write pointer is advanced
- * past the offending event. This means that events can be lost if I/O 
- * errors are encountered.
- *
- * @author Sun Microsystems, Inc.
- *
- * @since 2.0
- */
-
-/*
-Implementation details:
-
-Event state information is maintained by a control log file 
-which maintains the current read and write counts/offsets.  This file
-is updated whenever next(), add() or remove() is called.  This file used to 
-initialize the state of the EventLog, if it exists, during the 
-initialization process.
-
-Events are logged into log files that maintain a fixed # of objects
-per file.  The writing process adds events to a log file until this limit
-is reached. A new log file is then created to handle the next set of events. 
-The reading process will delete a log file once all the contained events 
-have been successfully processed.  This serves as the garbage 
-collection mechanism. Note that the actual number of events per log is a 
-user-configurable environmental parameter via the 
-org.apache.river.mercury.eventsPerLog property (default = 10).
-
-This class also employs the services of a StreamPool, which limits the
-number of concurrently open file descriptors. This limit is also a user
-configurable value via the org.apache.river.mercury.streamPoolSize property 
-(default = 10).
-
-Event state is kept separate from the service's registration state in 
-order to keep the event logging implementation as flexible as possible.
-*/
-
-class PersistentEventLog implements EventLog {
-
-    //
-    // Class fields
-    //
-
-    /** <tt>Logger</tt> used for persistence-related debugging messages */
-    private static final Logger persistenceLogger = 
-       MailboxImpl.PERSISTENCE_LOGGER;
-
-    /** Size of control data file: 4 longs * 8 bytes per long */
-    private static final int CTLBLOCK_LEN = 8 * 4;
-
-    /** File suffix for the control file */
-    private static final String CTLFILE_SUFFIX = "ctl";
-
-    /** File suffix for the log files */
-    private static final String LOGFILE_SUFFIX = "log";
-
-    /** Default number of events per log file, if not overriden by the user */
-    private static final long DEFAULT_EVENTS_PER_LOGFILE = 10L;
-
-    /** 
-     * Maximum number of event objects per log file. The
-     * default is used unless overridden by <tt>eventsPerLogProperty</tt>.
-     */
-    // Should be final but the compiler complains.
-//TODO - make eventsPerLogFile & maxPoolSize configurable
-/*
- * Notes: this probably needs to remain constant over service lifetime. 
Therefore,
- * this really needs to be treated as an "initial value that persisted and 
recovered.
- * Would like to piggy back off MailboxImpl recovery logic, but 1) it needs to 
recover
- * first in order to determine "first time up" and 2) seems logically better 
to keep
- * these parameters local to EventLog. Will probably need to develop EventLog 
recovery
- * logic for this purpose.
- */
-    private static final long eventsPerLogFile = DEFAULT_EVENTS_PER_LOGFILE;
-
-    /** Default size for the stream pool, if not overriden by the user */
-    static final int DEFAULT_STREAM_POOL_SIZE = 10;
-
-    /** 
-     * Maximum limit for the number of concurrent <tt>LogStream</tt>s
-     * that this pool will concurrently manage. The
-     * default is used unless overridden by <tt>streamPoolSizeProperty</tt>.
-     */
-    private static final int maxPoolSize = DEFAULT_STREAM_POOL_SIZE; 
-
-    /** 
-     * The <tt>StreamPool</tt> reference to be used for all 
-     * instances of <tt>EventLog</tt>.
-     */
-    // Should be final but the compiler complains.
-    private static final StreamPool streamPool = 
-        new StreamPool(maxPoolSize);
-
-    //
-    // Object fields
-    //
-
-    /** The associated <tt>Uuid</tt> for this <tt>EventLog</tt>. */
-    private Uuid uuid = null;
-
-    /** The current number of written events. */
-    private long wcount = 0;
-
-    /** The current number of read events. */
-    private long rcount = 0;
-
-    /** The current write offset into the current "write" log. */
-    private long wpos = 0;
-
-    /** 
-     * The read offset into the current "read" log of the 
-     * "last read" object.
-     */
-    private long rpos = 0;
-
-    /** 
-     * The read offset of the next event. This gets updated to
-     * <tt>rpos</tt> once <tt>remove</tt> is called (indicating
-     * that the last event read was successful).
-     */
-    private long nextReadPos = 0;
-
-    /** The <tt>EventReader</tt> used to retrieve events. */
-    private EventReader eventReader;
-
-    /** The <tt>EventWriter</tt> used to store events */
-    private EventWriter eventWriter;
-
-    /** The <tt>File</tt> object of the event persistence directory */
-    private File logDir;
-
-    /** 
-     * The <tt>File</tt> object that will maintain the control data for
-     * for this <tt>EventLog</tt>.
-     */
-    private File controlFile;
-
-    /** The in memory buffer that holds the control data */
-    private byte[] ctlbuf = new byte[CTLBLOCK_LEN];
-
-    /** 
-     * Flag that is used to determine whether or not this object 
-     * has been initialized. 
-     */
-    private boolean initialized = false;
-
-    /** 
-     * Flag that is used to determine whether or not this object 
-     * has been closed. 
-     */
-    private boolean closed = false;
-
-    private static final boolean debugState = false;
-    
-    /**
-     * Simple constructor that takes a <tt>Uuid</tt> argument
-     * and a <tt>File</tt> argument.  These arguments are simply
-     * assigned to the appropriate internal fields. 
-     *
-     * @exception IllegalArgumentException if any of the arguments are null
-     */
-    PersistentEventLog(Uuid uuid, File logDir) {
-        if (logDir == null || uuid == null) 
-            throw new IllegalArgumentException("Arguments cannot be null");
-        this.uuid = uuid;
-        this.logDir = logDir;
-
-        if (persistenceLogger.isLoggable(Level.FINEST)) {
-            persistenceLogger.log(Level.FINEST, 
-               "PersistentEventLog for: {0}", uuid);
-        }
-    }
-
-    // Inherit documentation from supertype
-    public void init() throws IOException {
-
-        if (initialized)
-            throw new InternalMailboxException(
-               "Trying to re-initialize control data "
-                + "for: " + uuid);
-
-        try {
-            if (!logDir.exists()) // Create log directory if it doesn't exist
-                logDir.mkdirs();
-
-            if (!logDir.isDirectory()) // Verify that logDir is a directory
-                throw new FileNotFoundException(logDir.toString()
-                                  + " is not a directory");
-
-            controlFile = getControlFile();
-            if (controlFile.isFile()) { // Recover state from existing file
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "EventLog::init() recovering data for ", 
-                        uuid);
-               }
-                readControlFile();
-           } else { 
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "default initialization for ", uuid);
-               }
-           }
-
-           eventReader = new EventReader();
-           eventWriter = new EventWriter();
-
-       } finally {
-       }
-
-        printControlData(persistenceLogger, "After EventLog::init");
-       
-       initialized = true;
-
-       if (debugState)
-           assertInvariants();
-    }
-
-    // Inherit documentation from supertype
-    public void add(RemoteEvent event) throws IOException {
-
-        stateCheck();
-
-       if (debugState)
-           assertInvariants();
-
-       long logNum = getLogNum(wcount);
-       File log = getLogFile(logNum);
-       LogOutputStream out = null;
-        try {
-            // Get output stream
-           out = streamPool.getLogOutputStream(log, wpos);
-
-            // Write the event
-           eventWriter.write(event, out);
-
-           // Update the control data
-           wpos = out.getOffset();
-           ++wcount;
-       } catch (IOException ioe) {
-           // We'll get interrupted when asked to shutdown.
-           // In this case, we can skip the call to nextWriteLog.
-           if (ioe instanceof InterruptedIOException) {
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "EventLog::add() interrupted "); 
-               }
-           } else {
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "EventLog::add() received IOException " 
-                       + "... skipping to next write log");
-               } 
-               nextWriteLog(); 
-           }
-            if (persistenceLogger.isLoggable(Level.FINEST)) {
-                persistenceLogger.log(Level.FINEST, 
-                   "Exception: ", ioe);
-           }
-           throw (IOException)ioe.fillInStackTrace();
-       } finally {
-           // Always make sure to release any obtained resources
-           if (out != null) {
-               if (getLogNum(wcount) == logNum) {
-                   streamPool.releaseLogStream(out);
-               } else { 
-                   // Done writing to this particular log file, so
-                   // remove it.
-                   streamPool.removeLogStream(out);
-                   wpos = 0;
-               }
-           }
-           // Write control data before returning
-            // TODO (FCS) - handle potential IOException
-            writeControlFile();
-       }
-
-        printControlData(persistenceLogger, "EventLog::add");
-
-       if (debugState) {
-           assertInvariants();
-       }
-    }
-
-    // Inherit documentation from supertype
-    public RemoteEvent next() throws IOException, ClassNotFoundException {
-        boolean IOExceptionCaught = false;
-
-        stateCheck();
-
-       if (debugState) {
-           assertInvariants();
-       }
-
-        // Check if empty
-       if (isEmpty()) 
-           throw new NoSuchElementException();
-
-        long logNum = getLogNum(rcount);
-        File log = getLogFile(logNum);
-        LogInputStream in = null;
-        RemoteEvent evt = null;
-        try {
-            // get the input stream
-            in = streamPool.getLogInputStream(log, rpos);
-            // read the event
-            evt = eventReader.read(in); 
-            // update the control data
-            nextReadPos = in.getOffset();
-            // Don't increment "real" read count until event is delivered
-            // indicated by a call to remove().
-        } catch (IOException ie) {
-           // We'll get interrupted when asked to shutdown.
-           // In this case, we can skip the call to nextReadLog.
-           if (ie instanceof InterruptedIOException) {
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "EventLog::next() interrupted ");
-               } 
-           } else {
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "EventLog::next() received IOException " 
-                       + "... skipping to next read log"); 
-               }
-               nextReadLog();
-                IOExceptionCaught = true; 
-           }
-            if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                       "Exception: ", ie);
-           }
-           throw (IOException)ie.fillInStackTrace();
-        } catch (ClassNotFoundException cnfe) {
-            // Note that the read offset is still valid since the
-            // MarshalledObject extraction should always succeed.
-            // It's just that the RemoteEvent within it could not
-            // be reconstituted. Therefore, just skip to the next
-            // MarshalledObject in the stream.
-            nextReadPos = in.getOffset();
-           throw (ClassNotFoundException)cnfe.fillInStackTrace();
-       } finally {
-           // If an IOException occurs then the eventReader is in 
-           // a bad state. Therefore, we get rid of our existing
-           // reference and create a fresh one. The underlying
-           // stream is also removed from from the pool, forcing
-           // a fresh copy to be returned on the next "get" request.
-           if (IOExceptionCaught) {
-               //TODO (FCS) add close to interface
-               //eventReader.close();
-               eventReader = new EventReader();
-               if (in != null)
-                   streamPool.removeLogStream(in);
-               // Remove associated log file since we are skipping over it
-               if (!log.delete())
-                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED,
-                           "Had trouble deleting {0}", log);
-                   }
-           } else {
-               if (in != null)
-                   streamPool.releaseLogStream(in);
-           }
-
-            // TODO (FCS) - handle potential IOException
-            writeControlFile();
-
-       }
-
-        printControlData(persistenceLogger, "After Event::next");
-
-       if (debugState) {
-           assertInvariants();
-       }
-
-        return evt;
-    }
-    
-    // Inherit documentation from supertype
-    public RemoteEventData[] readAhead(int maxEvents) 
-        throws IOException, ClassNotFoundException 
-    {
-        boolean IOExceptionCaught = false;
-
-        stateCheck();
-
-       if (debugState) {
-           assertInvariants();
-       }
-
-        // Check if empty
-       if (isEmpty()) 
-           throw new NoSuchElementException();    
-
-        long readCount = rcount;
-        long readPosition = rpos;
-        long logNum = getLogNum(readCount); 
-        File log = getLogFile(logNum);
-        RemoteEvent evt = null;
-        ArrayList rData = new ArrayList();
-        int i = 0;
-        LogInputStream in = null;
-        boolean done = false;
-        
-        printControlData(persistenceLogger, "Before read::readAhead");
-        if (persistenceLogger.isLoggable(Level.FINEST)) {
-            persistenceLogger.log(Level.FINEST, 
-                "EventLog::readAhead() maxEvents = {0}", 
-                new Object[] {Integer.valueOf(maxEvents)});
-        } 
-        
-        while ((readCount < wcount) && 
-               (rData.size() < maxEvents) &&
-               !done) 
-        {
-            if (logNum != getLogNum(readCount)) {
-                logNum = getLogNum(readCount);
-                log = getLogFile(logNum);
-                readPosition = 0;
-            }
-            try {
-                // get the input stream
-                in = streamPool.getLogInputStream(log, readPosition);
-                // read the event
-                evt = eventReader.read(in); 
-                // update readCount and readPosition
-                readCount++;
-                readPosition = in.getOffset(); // offset to next unread event
-                //Generate new entry
-                rData.add(new RemoteEventData(
-                   evt, 
-                   new RemoteEventDataCursor(readCount, readPosition)));
-            } catch (IOException ie) {
-                // We'll get interrupted when asked to shutdown.
-                // In this case, we can skip the call to nextReadAheadLog.
-                if (ie instanceof InterruptedIOException) {
-                    if (persistenceLogger.isLoggable(Level.FINEST)) {
-                        persistenceLogger.log(Level.FINEST, 
-                            "EventLog::readAhead() interrupted ");
-                    } 
-                    // Stop processing events
-                    done = true;
-                } else {
-                    if (persistenceLogger.isLoggable(Level.FINEST)) {
-                        persistenceLogger.log(Level.FINEST, 
-                            "EventLog::readAhead() received IOException " 
-                            + "... skipping to next read log"); 
-                    }
-                    readCount = nextReadAheadLog(readCount);
-                    // TODO - rcount = readCount;
-                    // TODO - if bump rcount you need to bump wcount too
-                    // TODO - if bump counts then you need to persist them
-                    readPosition = 0;
-                    IOExceptionCaught = true; 
-                    if (persistenceLogger.isLoggable(Level.FINEST)) {
-                        persistenceLogger.log(Level.FINEST, 
-                            "EventLog::readAhead() new readCount is {0}", 
-                            Long.valueOf(readCount)); 
-                    }                    
-                }
-                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED, 
-                            "Exception: ", ie);
-                }
-            } catch (ClassNotFoundException cnfe) {
-                // Note that the read offset is still valid since the
-                // MarshalledObject extraction should always succeed.
-                // It's just that the RemoteEvent within it could not
-                // be reconstituted. Therefore, just skip to the next
-                // MarshalledObject in the stream.
-                readCount++;
-                readPosition = in.getOffset();
-                //Generate new entry
-                rData.add(new RemoteEventData(
-                   null, 
-                   new RemoteEventDataCursor(readCount, readPosition)));       
         
-                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED, 
-                            "Exception: ", cnfe);
-                }            
-            } finally {
-                // If an IOException occurs then the eventReader is in 
-                // a bad state. Therefore, we get rid of our existing
-                // reference and create a fresh one. The underlying
-                // stream is also removed from from the pool, forcing
-                // a fresh copy to be returned on the next "get" request.
-                if (IOExceptionCaught) {
-                    //TODO (FCS) add close to interface
-                    //eventReader.close();
-                    eventReader = new EventReader();
-                    if (in != null) {
-                        streamPool.removeLogStream(in);
-                        in = null;
-                    }
-                } else { 
-                    if (in != null)
-                        streamPool.releaseLogStream(in);
-                    /* Shouldn't have to release since we have exclusive
-                     * access, but the next getLogInputStream assumes things
-                     * are on the freelist.
-                     */
-                }
-
-            }
-        }
-
-        printControlData(persistenceLogger, "After Event::readAhead");
-
-       if (debugState) {
-           assertInvariants();
-       }
-        
-        return (RemoteEventData[]) rData.toArray(new RemoteEventData[0]);
-    }
-    
-    // Inherit documentation from supertype
-    public boolean isEmpty() throws IOException {
-        stateCheck();
-        return !(rcount < wcount);
-    }
-
-    // Inherit documentation from supertype
-    public void remove() throws IOException {
-        stateCheck();
-
-       if (debugState) {
-           assertInvariants();
-       }
-
-       // Get current log number
-        long logNum = getLogNum(rcount);
-
-       // Update read counts
-       if (rcount < wcount) {
-           ++rcount;
-           rpos = nextReadPos;
-       } else {
-           throw new NoSuchElementException();
-       }
-
-       // Remove old log if we've advanced to a new
-       // log file (ie done reading the old log file).
-       if (getLogNum(rcount) != logNum) {
-           File log = getLogFile(logNum);
-           LogInputStream in = null;
-           try {
-               in = streamPool.getLogInputStream(log, rpos);
-               streamPool.removeLogStream(in);
-           } finally {
-               if (!log.delete())
-                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED, 
-                           "Had trouble deleting {0}", log);
-                   }
-               rpos = nextReadPos = 0;
-           }
-       }
-
-       // Store control data
-        writeControlFile();
-
-        // Verify that state remains intact
-       assertInvariants();
-
-
-        printControlData(persistenceLogger, "After Event::remove");
-
-    }
-
-        // Inherit documentation from supertype
-    public void moveAhead(Object cookie) throws IOException {
-        stateCheck();
-
-       if (debugState) {
-           assertInvariants();
-       }
-        
-        printControlData(persistenceLogger, "Before Event::moveAhead");
-        
-        long readCount = 0;
-        long readPosition = 0;        
-        RemoteEventDataCursor cursor = (RemoteEventDataCursor)cookie;
-        if (cursor != null) {
-            readCount = cursor.getReadCount();
-            readPosition = cursor.getReadPosition();
-        } else {
-            /* TODO - should throw NullPointerException, but we do 
-             * get called with null if client initially gets an empty set.
-             * Need to change getNextBatchDo() to skip this call 
-             * if cookie is null. Also, could just return at this point since
-             * nothing will be advanced.
-             */
-           readCount = rcount;
-           readPosition = rpos;
-       }
-
-        if (readCount > wcount) {
-            throw new NoSuchElementException();
-        }
-
-        long currentLogNum = getLogNum(rcount);
-        long nextLogNum = getLogNum(readCount);
-        if (persistenceLogger.isLoggable(Level.FINEST)) {
-            persistenceLogger.log(Level.FINEST, 
-                "EventLog::moveAhead() readCount = {0}, readPosition = {1}, " 
-                + "currentLogNum = {2}, nextLogNum = {3}",
-                new Object[] {Long.valueOf(readCount), 
Long.valueOf(readPosition),
-                              Long.valueOf(currentLogNum), 
Long.valueOf(nextLogNum)}); 
-        }        
-        File logFile = null;
-        LogInputStream in = null;
-        while (currentLogNum < nextLogNum) {
-            try {
-                logFile = getLogFile(currentLogNum);
-                in = streamPool.getLogInputStream(logFile, rpos);
-                streamPool.removeLogStream(in);
-                /*
-                 * NOTE - if file doesn't exist (corrupted) then
-                 * getLogInputStream throws an IOException and you
-                 * end up skipping the removeLogStream() call.
-                 * This "log" should eventually get cleared if once
-                 * the stream pool hits its limit.
-                 */
-
-            } catch (IOException ioe) {
-                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                    persistenceLogger.log(Levels.HANDLED, 
-                        "Had trouble accessing old log file", ioe);
-                }
-            } finally {
-               if (logFile != null && !logFile.delete()) {
-                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED, 
-                           "Had trouble deleting {0}", logFile);
-                   }
-                }
-                if (persistenceLogger.isLoggable(Level.FINEST)) {
-                    persistenceLogger.log(Level.FINEST, 
-                        "Deleted {0}", logFile);
-                }
-                currentLogNum++;
-            }
-        }
-        rcount = readCount;
-        // If we started a new log file, then reset the read pointer.
-        if ((rcount % eventsPerLogFile) == 0) {
-            rpos = 0;
-        } else {
-            rpos = readPosition;
-        }
-        
-       // Store control data
-        writeControlFile();
-        
-        // Verify that state remains intact
-       assertInvariants();
-
-        printControlData(persistenceLogger, "After Event::moveAhead");
-
-    }        
-    
-    // Inherit documentation from supertype
-    public void close() throws IOException {
-        stateCheck();
-       if (debugState) {
-           assertInvariants();
-       }
-
-       // Close read log
-       long logNum = getLogNum(rcount);
-       File log = getLogFile(logNum);
-       LogStream strm = null;
-       try {
-           strm = streamPool.getLogInputStream(log, rpos);
-           streamPool.removeLogStream(strm);
-       } catch (FileNotFoundException fnfe) {
-           // Can happen if close is called just after you've finished
-           // reading a log. That is, the readCount is advanced to a new
-           // log which might not exist if a write hasn't taken place.
-           // TODO (FCS) - put a check to verify this condition and throw an
-           // exception otherwise?
-       } catch (IOException ioe) {
-           // catch, but ignore so as not to skip the following
-           // code.
-       }
-
-       // Close write log
-       logNum = getLogNum(wcount);
-       log = getLogFile(logNum);
-       try {
-           strm = streamPool.getLogOutputStream(log, wpos);
-           streamPool.removeLogStream(strm);
-       } catch (IOException ioe) {
-           // catch, but ignore so as not to skip the following
-           // code.
-       }
-       
-       // Close control log
-       try {
-           strm = streamPool.getControlLog(controlFile);
-           streamPool.removeLogStream(strm);
-       } catch (IOException ioe) {
-           // catch, but ignore so as not to skip the following
-           // code.
-       }
-
-        //TODO (FCS) - flag that an IOException was caught and rethrow to
-        // indicate that there was a problem.
-
-       closed = true;
-
-       if (persistenceLogger.isLoggable(Level.FINEST)) {
-            persistenceLogger.log(Level.FINEST, 
-               "EventLog::close for {0}", uuid);
-       }
-    }
-
-    // Inherit documentation from supertype
-    public void delete() throws IOException {
-        if (!closed)
-            throw new IOException("Cannot delete log until it is closed");
-        removeDir(logDir);
-    }
-
-    /**
-     * Attempt to delete the associated event log persistence directory.
-     */
-    private static void removeDir(File logDir) {
-        if (logDir == null || !logDir.isDirectory())
-            return;
-
-        // Get the contents of this directory
-        String[] contents = logDir.list();
-       if (contents == null)  // Can happen if there was an IO error
-           return;
-
-        File entry = null;
-       if (persistenceLogger.isLoggable(Level.FINEST)) {
-            persistenceLogger.log(Level.FINEST, 
-               "Deleting contents of: {0}", logDir);
-       }
-        for (int i=0; i < contents.length; i++) {
-            entry = new File(logDir, contents[i]);
-            if (entry.isDirectory()) {
-                removeDir(entry);
-           } else {
-                if(!entry.delete()) {
-                    if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                        persistenceLogger.log(Levels.HANDLED, 
-                           "Had trouble deleting file: {0}",
-                            entry);
-                   }
-               } else {
-                    if (persistenceLogger.isLoggable(Level.FINEST)) {
-                        persistenceLogger.log(Level.FINEST, 
-                           "Deleted file: {0}", entry);
-                   }
-               }
-           }
-       }
-
-        if (!logDir.delete()) {
-            if (persistenceLogger.isLoggable(Levels.HANDLED)) {
-                persistenceLogger.log(Levels.HANDLED, 
-                   "Had trouble deleting directory: {0}",
-                    logDir);
-           }
-       } else {
-            if (persistenceLogger.isLoggable(Level.FINEST)) {
-                persistenceLogger.log(Level.FINEST, 
-                    "Deleted directory: {0}", logDir);
-           }
-       }
-    }
-
-    /**
-     * Advance the "read" state to the next available log.
-     */
-    private void nextReadLog() {
-       if (debugState)
-           assertInvariants();
-
-        // Normalize event count in case we were called due to a problem with
-        // one of the log files
-        long remainder = rcount % eventsPerLogFile; 
-        rcount += (eventsPerLogFile - remainder);
-       rpos = 0;
-
-        // Ensure that the writeLog data doesn't get behind
-       if (verifyInvariants() == false) {
-           nextWriteLog();
-       }
-
-        // Assert that state is still valid
-       if (debugState)
-           assertInvariants();
-
-        printControlData(persistenceLogger, "EventLog::nextReadLog");
-    }
-
-    /**
-     * Advance the temporary "read" state to the next available log.
-     */
-    private long nextReadAheadLog(long readCount) {
-        // Normalize event count in case we were called due to a problem with
-        // one of the log files
-        long remainder = readCount % eventsPerLogFile; 
-        readCount += (eventsPerLogFile - remainder);
-
-        printControlData(persistenceLogger, "EventLog::nextReadAheadLog");
-        
-        return readCount;
-    }
-    
-    /**
-     * Advance the "write" state to the next available log.
-     */
-    private void nextWriteLog() {
-        // Don't check invariants at the top since we can get called
-        // from readNextLog() with invalid state.
-
-        // Normalize event count in case we were called due to a problem with
-        // one of the log files
-        long remainder = wcount % eventsPerLogFile; 
-        wcount += (eventsPerLogFile - remainder);
-       wpos = 0;
-
-        printControlData(persistenceLogger, "EventLog::nextWriteLog");
-
-       if (debugState)
-           assertInvariants();
-    }
-
-    /**
-     * Output state information to the given <tt>Logger</tt>.
-     * This is intended for debugging purposes only.
-     */
-    private void printControlData(Logger logger, String msg) {
-       if (logger.isLoggable(Level.FINEST)) {
-           logger.log(Level.FINEST, "{0}", msg);
-            logger.log(Level.FINEST, "ID: {0}", uuid);
-            logger.log(Level.FINEST, "ReadCount: {0}", 
-               Long.valueOf(rcount));
-            logger.log(Level.FINEST, "ReadPos: {0}",  
-               Long.valueOf(rpos));
-            logger.log(Level.FINEST, "NextReadPos: {0}",  
-               Long.valueOf(nextReadPos));
-            logger.log(Level.FINEST, "WriteCount: {0}",  
-               Long.valueOf(wcount));
-            logger.log(Level.FINEST, "WritePos: {0}",  
-               Long.valueOf(wpos));
-       }
-    }
-
-    /**
-     * Write state information to the underlying store.
-     */
-    private void writeControlFile() throws IOException {
-       packLong(wcount, ctlbuf, 0);
-       packLong(rcount, ctlbuf, 8);
-       packLong(wpos, ctlbuf, 16);
-       packLong(rpos, ctlbuf, 24);
-       
-       ControlLog ctl = streamPool.getControlLog(controlFile);
-       ctl.seek(0L);
-       ctl.write(ctlbuf);
-       ctl.getFD().sync();
-       streamPool.releaseLogStream(ctl);
-    }
-    
-    /**
-     * Read state information from the underlying store.
-     */
-    private void readControlFile() throws IOException {
-       ControlLog ctl = streamPool.getControlLog(controlFile);
-       ctl.seek(0L);
-       ctl.readFully(ctlbuf);
-       streamPool.releaseLogStream(ctl);
-
-       wcount = unpackLong(ctlbuf, 0);
-       rcount = unpackLong(ctlbuf, 8);
-       wpos = unpackLong(ctlbuf, 16);
-       rpos = unpackLong(ctlbuf, 24);
-    }
-    
-    /**
-     * Utility method for packing a <tt>long</tt> into a <tt>byte</tt> array.
-     */
-    private static void packLong(long val, byte[] b, int off) {
-       b[off++] = (byte) (val >>> 56);
-       b[off++] = (byte) (val >>> 48);
-       b[off++] = (byte) (val >>> 40);
-       b[off++] = (byte) (val >>> 32);
-       b[off++] = (byte) (val >>> 24);
-       b[off++] = (byte) (val >>> 16);
-       b[off++] = (byte) (val >>> 8);
-       b[off++] = (byte) (val >>> 0);
-    }
-    
-    /**
-     * Utility method for unpacking a <tt>long</tt> from a <tt>byte</tt> array.
-     */
-    private static long unpackLong(byte[] b, int off) {
-       return ((b[off + 0] & 0xFFL) << 56) +
-           ((b[off + 1] & 0xFFL) << 48) +
-           ((b[off + 2] & 0xFFL) << 40) +
-           ((b[off + 3] & 0xFFL) << 32) +
-           ((b[off + 4] & 0xFFL) << 24) +
-           ((b[off + 5] & 0xFFL) << 16) +
-           ((b[off + 6] & 0xFFL) << 8) +
-           ((b[off + 7] & 0xFFL) << 0);
-    }
-    
-    /**
-     * Utility method for returning the <tt>File</tt> associated with the
-     * given <tt>lognum</tt>.
-     */
-    private File getLogFile(long lognum) {
-       return new File(logDir, lognum + "." + 
LOGFILE_SUFFIX).getAbsoluteFile();
-    }
-    
-    /**
-     * Utility method for returning the <tt>File</tt> that contains the
-     * state information for this log.
-     */
-    private File getControlFile() {
-       return new File(logDir, "log." + CTLFILE_SUFFIX).getAbsoluteFile();
-    }
-
-    /**
-     * Utility method for returning the log file number for the given
-     * (event) <tt>count</tt>.
-     */
-    private static long getLogNum(long count) {
-       return (count / eventsPerLogFile); 
-    }
-
-
-    /**
-     * Asserts that the log is in a valid state.
-     * 
-     * @exception IOException if the log is in an invalid state
-     */
-    private void stateCheck() throws IOException {
-        if (!initialized)
-            throw new IOException("Trying to use an uninitialized "
-                + "control data object for: " + uuid);
-       if (closed)
-           throw new IOException("Attempt to access closed log file for : "
-               + uuid);
-    }
-
-    /**
-     * Utility method for checking if the object invariants are valid.
-     */
-    private boolean verifyInvariants() {
-        if ((wcount < rcount) ||
-            ((getLogNum(wcount) == getLogNum(rcount)) && (wpos < rpos))) {
-            return false;
-       }
-       return true;
-    }
-
-    /**
-     * Utility method for asserting that the object invariants are valid.
-     *
-     * @exception InternalMailboxException if the invariants aren't valid
-     */
-    private void assertInvariants() {
-        if (verifyInvariants() == false) {
-            printControlData(persistenceLogger, "Dumping invalid state for " + 
uuid);
-            throw new InternalMailboxException("Invalid state for " + uuid);
-       }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.mercury;
+
+import net.jini.id.Uuid;
+
+import org.apache.river.logging.Levels;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InterruptedIOException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.NoSuchElementException;
+
+import net.jini.core.event.RemoteEvent;
+import org.apache.river.mercury.proxy.InternalMailboxException;
+import org.apache.river.mercury.proxy.RemoteEventData;
+import org.apache.river.mercury.proxy.RemoteEventDataCursor;
+
+
+
+/**
+ * Class that implements the interface for an <tt>EventLog</tt>. 
+ * This class encapsulates the details of reading/writing events from/to
+ * some underlying persistence mechanism.
+ *
+ * This class makes certain assumptions. First, the <tt>next</tt> and
+ * <tt>remove</tt> methods are intended to be called in pairs. If 
+ * <tt>remove</tt> is not called, then subsequent calls to <tt>next</tt> 
+ * will attempt to return the same object. Calling <tt>remove</tt> 
+ * essentially advances the read pointer to the next object, if any. 
+ * Second, if any <tt>IOExceptions</tt> are encountered during the reading
+ * or writing of an event the associated read/write pointer is advanced
+ * past the offending event. This means that events can be lost if I/O 
+ * errors are encountered.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @since 2.0
+ */
+
+/*
+Implementation details:
+
+Event state information is maintained by a control log file 
+which maintains the current read and write counts/offsets.  This file
+is updated whenever next(), add() or remove() is called.  This file used to 
+initialize the state of the EventLog, if it exists, during the 
+initialization process.
+
+Events are logged into log files that maintain a fixed # of objects
+per file.  The writing process adds events to a log file until this limit
+is reached. A new log file is then created to handle the next set of events. 
+The reading process will delete a log file once all the contained events 
+have been successfully processed.  This serves as the garbage 
+collection mechanism. Note that the actual number of events per log is a 
+user-configurable environmental parameter via the 
+org.apache.river.mercury.eventsPerLog property (default = 10).
+
+This class also employs the services of a StreamPool, which limits the
+number of concurrently open file descriptors. This limit is also a user
+configurable value via the org.apache.river.mercury.streamPoolSize property 
+(default = 10).
+
+Event state is kept separate from the service's registration state in 
+order to keep the event logging implementation as flexible as possible.
+*/
+
+class PersistentEventLog implements EventLog {
+
+    //
+    // Class fields
+    //
+
+    /** <tt>Logger</tt> used for persistence-related debugging messages */
+    private static final Logger persistenceLogger = 
+       MailboxImpl.PERSISTENCE_LOGGER;
+
+    /** Size of control data file: 4 longs * 8 bytes per long */
+    private static final int CTLBLOCK_LEN = 8 * 4;
+
+    /** File suffix for the control file */
+    private static final String CTLFILE_SUFFIX = "ctl";
+
+    /** File suffix for the log files */
+    private static final String LOGFILE_SUFFIX = "log";
+
+    /** Default number of events per log file, if not overriden by the user */
+    private static final long DEFAULT_EVENTS_PER_LOGFILE = 10L;
+
+    /** 
+     * Maximum number of event objects per log file. The
+     * default is used unless overridden by <tt>eventsPerLogProperty</tt>.
+     */
+    // Should be final but the compiler complains.
+//TODO - make eventsPerLogFile & maxPoolSize configurable
+/*
+ * Notes: this probably needs to remain constant over service lifetime. 
Therefore,
+ * this really needs to be treated as an "initial value that persisted and 
recovered.
+ * Would like to piggy back off MailboxImpl recovery logic, but 1) it needs to 
recover
+ * first in order to determine "first time up" and 2) seems logically better 
to keep
+ * these parameters local to EventLog. Will probably need to develop EventLog 
recovery
+ * logic for this purpose.
+ */
+    private static final long eventsPerLogFile = DEFAULT_EVENTS_PER_LOGFILE;
+
+    /** Default size for the stream pool, if not overriden by the user */
+    static final int DEFAULT_STREAM_POOL_SIZE = 10;
+
+    /** 
+     * Maximum limit for the number of concurrent <tt>LogStream</tt>s
+     * that this pool will concurrently manage. The
+     * default is used unless overridden by <tt>streamPoolSizeProperty</tt>.
+     */
+    private static final int maxPoolSize = DEFAULT_STREAM_POOL_SIZE; 
+
+    /** 
+     * The <tt>StreamPool</tt> reference to be used for all 
+     * instances of <tt>EventLog</tt>.
+     */
+    // Should be final but the compiler complains.
+    private static final StreamPool streamPool = 
+        new StreamPool(maxPoolSize);
+
+    //
+    // Object fields
+    //
+
+    /** The associated <tt>Uuid</tt> for this <tt>EventLog</tt>. */
+    private Uuid uuid = null;
+
+    /** The current number of written events. */
+    private long wcount = 0;
+
+    /** The current number of read events. */
+    private long rcount = 0;
+
+    /** The current write offset into the current "write" log. */
+    private long wpos = 0;
+
+    /** 
+     * The read offset into the current "read" log of the 
+     * "last read" object.
+     */
+    private long rpos = 0;
+
+    /** 
+     * The read offset of the next event. This gets updated to
+     * <tt>rpos</tt> once <tt>remove</tt> is called (indicating
+     * that the last event read was successful).
+     */
+    private long nextReadPos = 0;
+
+    /** The <tt>EventReader</tt> used to retrieve events. */
+    private EventReader eventReader;
+
+    /** The <tt>EventWriter</tt> used to store events */
+    private EventWriter eventWriter;
+
+    /** The <tt>File</tt> object of the event persistence directory */
+    private File logDir;
+
+    /** 
+     * The <tt>File</tt> object that will maintain the control data for
+     * for this <tt>EventLog</tt>.
+     */
+    private File controlFile;
+
+    /** The in memory buffer that holds the control data */
+    private byte[] ctlbuf = new byte[CTLBLOCK_LEN];
+
+    /** 
+     * Flag that is used to determine whether or not this object 
+     * has been initialized. 
+     */
+    private boolean initialized = false;
+
+    /** 
+     * Flag that is used to determine whether or not this object 
+     * has been closed. 
+     */
+    private boolean closed = false;
+
+    private static final boolean debugState = false;
+    
+    /**
+     * Simple constructor that takes a <tt>Uuid</tt> argument
+     * and a <tt>File</tt> argument.  These arguments are simply
+     * assigned to the appropriate internal fields. 
+     *
+     * @exception IllegalArgumentException if any of the arguments are null
+     */
+    PersistentEventLog(Uuid uuid, File logDir) {
+        if (logDir == null || uuid == null) 
+            throw new IllegalArgumentException("Arguments cannot be null");
+        this.uuid = uuid;
+        this.logDir = logDir;
+
+        if (persistenceLogger.isLoggable(Level.FINEST)) {
+            persistenceLogger.log(Level.FINEST, 
+               "PersistentEventLog for: {0}", uuid);
+        }
+    }
+
+    // Inherit documentation from supertype
+    public void init() throws IOException {
+
+        if (initialized)
+            throw new InternalMailboxException(
+               "Trying to re-initialize control data "
+                + "for: " + uuid);
+
+        try {
+            if (!logDir.exists()) // Create log directory if it doesn't exist
+                logDir.mkdirs();
+
+            if (!logDir.isDirectory()) // Verify that logDir is a directory
+                throw new FileNotFoundException(logDir.toString()
+                                  + " is not a directory");
+
+            controlFile = getControlFile();
+            if (controlFile.isFile()) { // Recover state from existing file
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "EventLog::init() recovering data for ", 
+                        uuid);
+               }
+                readControlFile();
+           } else { 
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "default initialization for ", uuid);
+               }
+           }
+
+           eventReader = new EventReader();
+           eventWriter = new EventWriter();
+
+       } finally {
+       }
+
+        printControlData(persistenceLogger, "After EventLog::init");
+       
+       initialized = true;
+
+       if (debugState)
+           assertInvariants();
+    }
+
+    // Inherit documentation from supertype
+    public void add(RemoteEvent event) throws IOException {
+
+        stateCheck();
+
+       if (debugState)
+           assertInvariants();
+
+       long logNum = getLogNum(wcount);
+       File log = getLogFile(logNum);
+       LogOutputStream out = null;
+        try {
+            // Get output stream
+           out = streamPool.getLogOutputStream(log, wpos);
+
+            // Write the event
+           eventWriter.write(event, out);
+
+           // Update the control data
+           wpos = out.getOffset();
+           ++wcount;
+       } catch (IOException ioe) {
+           // We'll get interrupted when asked to shutdown.
+           // In this case, we can skip the call to nextWriteLog.
+           if (ioe instanceof InterruptedIOException) {
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "EventLog::add() interrupted "); 
+               }
+           } else {
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "EventLog::add() received IOException " 
+                       + "... skipping to next write log");
+               } 
+               nextWriteLog(); 
+           }
+            if (persistenceLogger.isLoggable(Level.FINEST)) {
+                persistenceLogger.log(Level.FINEST, 
+                   "Exception: ", ioe);
+           }
+           throw (IOException)ioe.fillInStackTrace();
+       } finally {
+           // Always make sure to release any obtained resources
+           if (out != null) {
+               if (getLogNum(wcount) == logNum) {
+                   streamPool.releaseLogStream(out);
+               } else { 
+                   // Done writing to this particular log file, so
+                   // remove it.
+                   streamPool.removeLogStream(out);
+                   wpos = 0;
+               }
+           }
+           // Write control data before returning
+            // TODO (FCS) - handle potential IOException
+            writeControlFile();
+       }
+
+        printControlData(persistenceLogger, "EventLog::add");
+
+       if (debugState) {
+           assertInvariants();
+       }
+    }
+
+    // Inherit documentation from supertype
+    public RemoteEvent next() throws IOException, ClassNotFoundException {
+        boolean IOExceptionCaught = false;
+
+        stateCheck();
+
+       if (debugState) {
+           assertInvariants();
+       }
+
+        // Check if empty
+       if (isEmpty()) 
+           throw new NoSuchElementException();
+
+        long logNum = getLogNum(rcount);
+        File log = getLogFile(logNum);
+        LogInputStream in = null;
+        RemoteEvent evt = null;
+        try {
+            // get the input stream
+            in = streamPool.getLogInputStream(log, rpos);
+            // read the event
+            evt = eventReader.read(in); 
+            // update the control data
+            nextReadPos = in.getOffset();
+            // Don't increment "real" read count until event is delivered
+            // indicated by a call to remove().
+        } catch (IOException ie) {
+           // We'll get interrupted when asked to shutdown.
+           // In this case, we can skip the call to nextReadLog.
+           if (ie instanceof InterruptedIOException) {
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "EventLog::next() interrupted ");
+               } 
+           } else {
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "EventLog::next() received IOException " 
+                       + "... skipping to next read log"); 
+               }
+               nextReadLog();
+                IOExceptionCaught = true; 
+           }
+            if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                       "Exception: ", ie);
+           }
+           throw (IOException)ie.fillInStackTrace();
+        } catch (ClassNotFoundException cnfe) {
+            // Note that the read offset is still valid since the
+            // MarshalledObject extraction should always succeed.
+            // It's just that the RemoteEvent within it could not
+            // be reconstituted. Therefore, just skip to the next
+            // MarshalledObject in the stream.
+            nextReadPos = in.getOffset();
+           throw (ClassNotFoundException)cnfe.fillInStackTrace();
+       } finally {
+           // If an IOException occurs then the eventReader is in 
+           // a bad state. Therefore, we get rid of our existing
+           // reference and create a fresh one. The underlying
+           // stream is also removed from from the pool, forcing
+           // a fresh copy to be returned on the next "get" request.
+           if (IOExceptionCaught) {
+               //TODO (FCS) add close to interface
+               //eventReader.close();
+               eventReader = new EventReader();
+               if (in != null)
+                   streamPool.removeLogStream(in);
+               // Remove associated log file since we are skipping over it
+               if (!log.delete())
+                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED,
+                           "Had trouble deleting {0}", log);
+                   }
+           } else {
+               if (in != null)
+                   streamPool.releaseLogStream(in);
+           }
+
+            // TODO (FCS) - handle potential IOException
+            writeControlFile();
+
+       }
+
+        printControlData(persistenceLogger, "After Event::next");
+
+       if (debugState) {
+           assertInvariants();
+       }
+
+        return evt;
+    }
+    
+    // Inherit documentation from supertype
+    public RemoteEventData[] readAhead(int maxEvents) 
+        throws IOException, ClassNotFoundException 
+    {
+        boolean IOExceptionCaught = false;
+
+        stateCheck();
+
+       if (debugState) {
+           assertInvariants();
+       }
+
+        // Check if empty
+       if (isEmpty()) 
+           throw new NoSuchElementException();    
+
+        long readCount = rcount;
+        long readPosition = rpos;
+        long logNum = getLogNum(readCount); 
+        File log = getLogFile(logNum);
+        RemoteEvent evt = null;
+        ArrayList rData = new ArrayList();
+        int i = 0;
+        LogInputStream in = null;
+        boolean done = false;
+        
+        printControlData(persistenceLogger, "Before read::readAhead");
+        if (persistenceLogger.isLoggable(Level.FINEST)) {
+            persistenceLogger.log(Level.FINEST, 
+                "EventLog::readAhead() maxEvents = {0}", 
+                new Object[] {Integer.valueOf(maxEvents)});
+        } 
+        
+        while ((readCount < wcount) && 
+               (rData.size() < maxEvents) &&
+               !done) 
+        {
+            if (logNum != getLogNum(readCount)) {
+                logNum = getLogNum(readCount);
+                log = getLogFile(logNum);
+                readPosition = 0;
+            }
+            try {
+                // get the input stream
+                in = streamPool.getLogInputStream(log, readPosition);
+                // read the event
+                evt = eventReader.read(in); 
+                // update readCount and readPosition
+                readCount++;
+                readPosition = in.getOffset(); // offset to next unread event
+                //Generate new entry
+                rData.add(new RemoteEventData(
+                   evt, 
+                   new RemoteEventDataCursor(readCount, readPosition)));
+            } catch (IOException ie) {
+                // We'll get interrupted when asked to shutdown.
+                // In this case, we can skip the call to nextReadAheadLog.
+                if (ie instanceof InterruptedIOException) {
+                    if (persistenceLogger.isLoggable(Level.FINEST)) {
+                        persistenceLogger.log(Level.FINEST, 
+                            "EventLog::readAhead() interrupted ");
+                    } 
+                    // Stop processing events
+                    done = true;
+                } else {
+                    if (persistenceLogger.isLoggable(Level.FINEST)) {
+                        persistenceLogger.log(Level.FINEST, 
+                            "EventLog::readAhead() received IOException " 
+                            + "... skipping to next read log"); 
+                    }
+                    readCount = nextReadAheadLog(readCount);
+                    // TODO - rcount = readCount;
+                    // TODO - if bump rcount you need to bump wcount too
+                    // TODO - if bump counts then you need to persist them
+                    readPosition = 0;
+                    IOExceptionCaught = true; 
+                    if (persistenceLogger.isLoggable(Level.FINEST)) {
+                        persistenceLogger.log(Level.FINEST, 
+                            "EventLog::readAhead() new readCount is {0}", 
+                            Long.valueOf(readCount)); 
+                    }                    
+                }
+                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED, 
+                            "Exception: ", ie);
+                }
+            } catch (ClassNotFoundException cnfe) {
+                // Note that the read offset is still valid since the
+                // MarshalledObject extraction should always succeed.
+                // It's just that the RemoteEvent within it could not
+                // be reconstituted. Therefore, just skip to the next
+                // MarshalledObject in the stream.
+                readCount++;
+                readPosition = in.getOffset();
+                //Generate new entry
+                rData.add(new RemoteEventData(
+                   null, 
+                   new RemoteEventDataCursor(readCount, readPosition)));       
         
+                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED, 
+                            "Exception: ", cnfe);
+                }            
+            } finally {
+                // If an IOException occurs then the eventReader is in 
+                // a bad state. Therefore, we get rid of our existing
+                // reference and create a fresh one. The underlying
+                // stream is also removed from from the pool, forcing
+                // a fresh copy to be returned on the next "get" request.
+                if (IOExceptionCaught) {
+                    //TODO (FCS) add close to interface
+                    //eventReader.close();
+                    eventReader = new EventReader();
+                    if (in != null) {
+                        streamPool.removeLogStream(in);
+                        in = null;
+                    }
+                } else { 
+                    if (in != null)
+                        streamPool.releaseLogStream(in);
+                    /* Shouldn't have to release since we have exclusive
+                     * access, but the next getLogInputStream assumes things
+                     * are on the freelist.
+                     */
+                }
+
+            }
+        }
+
+        printControlData(persistenceLogger, "After Event::readAhead");
+
+       if (debugState) {
+           assertInvariants();
+       }
+        
+        return (RemoteEventData[]) rData.toArray(new RemoteEventData[0]);
+    }
+    
+    // Inherit documentation from supertype
+    public boolean isEmpty() throws IOException {
+        stateCheck();
+        return !(rcount < wcount);
+    }
+
+    // Inherit documentation from supertype
+    public void remove() throws IOException {
+        stateCheck();
+
+       if (debugState) {
+           assertInvariants();
+       }
+
+       // Get current log number
+        long logNum = getLogNum(rcount);
+
+       // Update read counts
+       if (rcount < wcount) {
+           ++rcount;
+           rpos = nextReadPos;
+       } else {
+           throw new NoSuchElementException();
+       }
+
+       // Remove old log if we've advanced to a new
+       // log file (ie done reading the old log file).
+       if (getLogNum(rcount) != logNum) {
+           File log = getLogFile(logNum);
+           LogInputStream in = null;
+           try {
+               in = streamPool.getLogInputStream(log, rpos);
+               streamPool.removeLogStream(in);
+           } finally {
+               if (!log.delete())
+                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED, 
+                           "Had trouble deleting {0}", log);
+                   }
+               rpos = nextReadPos = 0;
+           }
+       }
+
+       // Store control data
+        writeControlFile();
+
+        // Verify that state remains intact
+       assertInvariants();
+
+
+        printControlData(persistenceLogger, "After Event::remove");
+
+    }
+
+        // Inherit documentation from supertype
+    public void moveAhead(Object cookie) throws IOException {
+        stateCheck();
+
+       if (debugState) {
+           assertInvariants();
+       }
+        
+        printControlData(persistenceLogger, "Before Event::moveAhead");
+        
+        long readCount = 0;
+        long readPosition = 0;        
+        RemoteEventDataCursor cursor = (RemoteEventDataCursor)cookie;
+        if (cursor != null) {
+            readCount = cursor.getReadCount();
+            readPosition = cursor.getReadPosition();
+        } else {
+            /* TODO - should throw NullPointerException, but we do 
+             * get called with null if client initially gets an empty set.
+             * Need to change getNextBatchDo() to skip this call 
+             * if cookie is null. Also, could just return at this point since
+             * nothing will be advanced.
+             */
+           readCount = rcount;
+           readPosition = rpos;
+       }
+
+        if (readCount > wcount) {
+            throw new NoSuchElementException();
+        }
+
+        long currentLogNum = getLogNum(rcount);
+        long nextLogNum = getLogNum(readCount);
+        if (persistenceLogger.isLoggable(Level.FINEST)) {
+            persistenceLogger.log(Level.FINEST, 
+                "EventLog::moveAhead() readCount = {0}, readPosition = {1}, " 
+                + "currentLogNum = {2}, nextLogNum = {3}",
+                new Object[] {Long.valueOf(readCount), 
Long.valueOf(readPosition),
+                              Long.valueOf(currentLogNum), 
Long.valueOf(nextLogNum)}); 
+        }        
+        File logFile = null;
+        LogInputStream in = null;
+        while (currentLogNum < nextLogNum) {
+            try {
+                logFile = getLogFile(currentLogNum);
+                in = streamPool.getLogInputStream(logFile, rpos);
+                streamPool.removeLogStream(in);
+                /*
+                 * NOTE - if file doesn't exist (corrupted) then
+                 * getLogInputStream throws an IOException and you
+                 * end up skipping the removeLogStream() call.
+                 * This "log" should eventually get cleared if once
+                 * the stream pool hits its limit.
+                 */
+
+            } catch (IOException ioe) {
+                if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                    persistenceLogger.log(Levels.HANDLED, 
+                        "Had trouble accessing old log file", ioe);
+                }
+            } finally {
+               if (logFile != null && !logFile.delete()) {
+                   if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED, 
+                           "Had trouble deleting {0}", logFile);
+                   }
+                }
+                if (persistenceLogger.isLoggable(Level.FINEST)) {
+                    persistenceLogger.log(Level.FINEST, 
+                        "Deleted {0}", logFile);
+                }
+                currentLogNum++;
+            }
+        }
+        rcount = readCount;
+        // If we started a new log file, then reset the read pointer.
+        if ((rcount % eventsPerLogFile) == 0) {
+            rpos = 0;
+        } else {
+            rpos = readPosition;
+        }
+        
+       // Store control data
+        writeControlFile();
+        
+        // Verify that state remains intact
+       assertInvariants();
+
+        printControlData(persistenceLogger, "After Event::moveAhead");
+
+    }        
+    
+    // Inherit documentation from supertype
+    public void close() throws IOException {
+        stateCheck();
+       if (debugState) {
+           assertInvariants();
+       }
+
+       // Close read log
+       long logNum = getLogNum(rcount);
+       File log = getLogFile(logNum);
+       LogStream strm = null;
+       try {
+           strm = streamPool.getLogInputStream(log, rpos);
+           streamPool.removeLogStream(strm);
+       } catch (FileNotFoundException fnfe) {
+           // Can happen if close is called just after you've finished
+           // reading a log. That is, the readCount is advanced to a new
+           // log which might not exist if a write hasn't taken place.
+           // TODO (FCS) - put a check to verify this condition and throw an
+           // exception otherwise?
+       } catch (IOException ioe) {
+           // catch, but ignore so as not to skip the following
+           // code.
+       }
+
+       // Close write log
+       logNum = getLogNum(wcount);
+       log = getLogFile(logNum);
+       try {
+           strm = streamPool.getLogOutputStream(log, wpos);
+           streamPool.removeLogStream(strm);
+       } catch (IOException ioe) {
+           // catch, but ignore so as not to skip the following
+           // code.
+       }
+       
+       // Close control log
+       try {
+           strm = streamPool.getControlLog(controlFile);
+           streamPool.removeLogStream(strm);
+       } catch (IOException ioe) {
+           // catch, but ignore so as not to skip the following
+           // code.
+       }
+
+        //TODO (FCS) - flag that an IOException was caught and rethrow to
+        // indicate that there was a problem.
+
+       closed = true;
+
+       if (persistenceLogger.isLoggable(Level.FINEST)) {
+            persistenceLogger.log(Level.FINEST, 
+               "EventLog::close for {0}", uuid);
+       }
+    }
+
+    // Inherit documentation from supertype
+    public void delete() throws IOException {
+        if (!closed)
+            throw new IOException("Cannot delete log until it is closed");
+        removeDir(logDir);
+    }
+
+    /**
+     * Attempt to delete the associated event log persistence directory.
+     */
+    private static void removeDir(File logDir) {
+        if (logDir == null || !logDir.isDirectory())
+            return;
+
+        // Get the contents of this directory
+        String[] contents = logDir.list();
+       if (contents == null)  // Can happen if there was an IO error
+           return;
+
+        File entry = null;
+       if (persistenceLogger.isLoggable(Level.FINEST)) {
+            persistenceLogger.log(Level.FINEST, 
+               "Deleting contents of: {0}", logDir);
+       }
+        for (int i=0; i < contents.length; i++) {
+            entry = new File(logDir, contents[i]);
+            if (entry.isDirectory()) {
+                removeDir(entry);
+           } else {
+                if(!entry.delete()) {
+                    if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                        persistenceLogger.log(Levels.HANDLED, 
+                           "Had trouble deleting file: {0}",
+                            entry);
+                   }
+               } else {
+                    if (persistenceLogger.isLoggable(Level.FINEST)) {
+                        persistenceLogger.log(Level.FINEST, 
+                           "Deleted file: {0}", entry);
+                   }
+               }
+           }
+       }
+
+        if (!logDir.delete()) {
+            if (persistenceLogger.isLoggable(Levels.HANDLED)) {
+                persistenceLogger.log(Levels.HANDLED, 
+                   "Had trouble deleting directory: {0}",
+                    logDir);
+           }
+       } else {
+            if (persistenceLogger.isLoggable(Level.FINEST)) {
+                persistenceLogger.log(Level.FINEST, 
+                    "Deleted directory: {0}", logDir);
+           }
+       }
+    }
+
+    /**
+     * Advance the "read" state to the next available log.
+     */
+    private void nextReadLog() {
+       if (debugState)
+           assertInvariants();
+
+        // Normalize event count in case we were called due to a problem with
+        // one of the log files
+        long remainder = rcount % eventsPerLogFile; 
+        rcount += (eventsPerLogFile - remainder);
+       rpos = 0;
+
+        // Ensure that the writeLog data doesn't get behind
+       if (verifyInvariants() == false) {
+           nextWriteLog();
+       }
+
+        // Assert that state is still valid
+       if (debugState)
+           assertInvariants();
+
+        printControlData(persistenceLogger, "EventLog::nextReadLog");
+    }
+
+    /**
+     * Advance the temporary "read" state to the next available log.
+     */
+    private long nextReadAheadLog(long readCount) {
+        // Normalize event count in case we were called due to a problem with
+        // one of the log files
+        long remainder = readCount % eventsPerLogFile; 
+        readCount += (eventsPerLogFile - remainder);
+
+        printControlData(persistenceLogger, "EventLog::nextReadAheadLog");
+        
+        return readCount;
+    }
+    
+    /**
+     * Advance the "write" state to the next available log.
+     */
+    private void nextWriteLog() {
+        // Don't check invariants at the top since we can get called
+        // from readNextLog() with invalid state.
+
+        // Normalize event count in case we were called due to a problem with
+        // one of the log files
+        long remainder = wcount % eventsPerLogFile; 
+        wcount += (eventsPerLogFile - remainder);
+       wpos = 0;
+
+        printControlData(persistenceLogger, "EventLog::nextWriteLog");
+
+       if (debugState)
+           assertInvariants();
+    }
+
+    /**
+     * Output state information to the given <tt>Logger</tt>.
+     * This is intended for debugging purposes only.
+     */
+    private void printControlData(Logger logger, String msg) {
+       if (logger.isLoggable(Level.FINEST)) {
+           logger.log(Level.FINEST, "{0}", msg);
+            logger.log(Level.FINEST, "ID: {0}", uuid);
+            logger.log(Level.FINEST, "ReadCount: {0}", 
+               Long.valueOf(rcount));
+            logger.log(Level.FINEST, "ReadPos: {0}",  
+               Long.valueOf(rpos));
+            logger.log(Level.FINEST, "NextReadPos: {0}",  
+               Long.valueOf(nextReadPos));
+            logger.log(Level.FINEST, "WriteCount: {0}",  
+               Long.valueOf(wcount));
+            logger.log(Level.FINEST, "WritePos: {0}",  
+               Long.valueOf(wpos));
+       }
+    }
+
+    /**
+     * Write state information to the underlying store.
+     */
+    private void writeControlFile() throws IOException {
+       packLong(wcount, ctlbuf, 0);
+       packLong(rcount, ctlbuf, 8);
+       packLong(wpos, ctlbuf, 16);
+       packLong(rpos, ctlbuf, 24);
+       
+       ControlLog ctl = streamPool.getControlLog(controlFile);
+       ctl.seek(0L);
+       ctl.write(ctlbuf);
+       ctl.getFD().sync();
+       streamPool.releaseLogStream(ctl);
+    }
+    
+    /**
+     * Read state information from the underlying store.
+     */
+    private void readControlFile() throws IOException {
+       ControlLog ctl = streamPool.getControlLog(controlFile);
+       ctl.seek(0L);
+       ctl.readFully(ctlbuf);
+       streamPool.releaseLogStream(ctl);
+
+       wcount = unpackLong(ctlbuf, 0);
+       rcount = unpackLong(ctlbuf, 8);
+       wpos = unpackLong(ctlbuf, 16);
+       rpos = unpackLong(ctlbuf, 24);
+    }
+    
+    /**
+     * Utility method for packing a <tt>long</tt> into a <tt>byte</tt> array.
+     */
+    private static void packLong(long val, byte[] b, int off) {
+       b[off++] = (byte) (val >>> 56);
+       b[off++] = (byte) (val >>> 48);
+       b[off++] = (byte) (val >>> 40);
+       b[off++] = (byte) (val >>> 32);
+       b[off++] = (byte) (val >>> 24);
+       b[off++] = (byte) (val >>> 16);
+       b[off++] = (byte) (val >>> 8);
+       b[off++] = (byte) (val >>> 0);
+    }
+    
+    /**
+     * Utility method for unpacking a <tt>long</tt> from a <tt>byte</tt> array.
+     */
+    private static long unpackLong(byte[] b, int off) {
+       return ((b[off + 0] & 0xFFL) << 56) +
+           ((b[off + 1] & 0xFFL) << 48) +
+           ((b[off + 2] & 0xFFL) << 40) +
+           ((b[off + 3] & 0xFFL) << 32) +
+           ((b[off + 4] & 0xFFL) << 24) +
+           ((b[off + 5] & 0xFFL) << 16) +
+           ((b[off + 6] & 0xFFL) << 8) +
+           ((b[off + 7] & 0xFFL) << 0);
+    }
+    
+    /**
+     * Utility method for returning the <tt>File</tt> associated with the
+     * given <tt>lognum</tt>.
+     */
+    private File getLogFile(long lognum) {
+       return new File(logDir, lognum + "." + 
LOGFILE_SUFFIX).getAbsoluteFile();
+    }
+    
+    /**
+     * Utility method for returning the <tt>File</tt> that contains the
+     * state information for this log.
+     */
+    private File getControlFile() {
+       return new File(logDir, "log." + CTLFILE_SUFFIX).getAbsoluteFile();
+    }
+
+    /**
+     * Utility method for returning the log file number for the given
+     * (event) <tt>count</tt>.
+     */
+    private static long getLogNum(long count) {
+       return (count / eventsPerLogFile); 
+    }
+
+
+    /**
+     * Asserts that the log is in a valid state.
+     * 
+     * @exception IOException if the log is in an invalid state
+     */
+    private void stateCheck() throws IOException {
+        if (!initialized)
+            throw new IOException("Trying to use an uninitialized "
+                + "control data object for: " + uuid);
+       if (closed)
+           throw new IOException("Attempt to access closed log file for : "
+               + uuid);
+    }
+
+    /**
+     * Utility method for checking if the object invariants are valid.
+     */
+    private boolean verifyInvariants() {
+        if ((wcount < rcount) ||
+            ((getLogNum(wcount) == getLogNum(rcount)) && (wpos < rpos))) {
+            return false;
+       }
+       return true;
+    }
+
+    /**
+     * Utility method for asserting that the object invariants are valid.
+     *
+     * @exception InternalMailboxException if the invariants aren't valid
+     */
+    private void assertInvariants() {
+        if (verifyInvariants() == false) {
+            printControlData(persistenceLogger, "Dumping invalid state for " + 
uuid);
+            throw new InternalMailboxException("Invalid state for " + uuid);
+       }
+    }
+}


Reply via email to