serge       01/12/03 14:58:50

  Modified:    src/java/org/apache/james/mailrepository
                        JDBCSpoolRepository.java
  Log:
  Big overhaul to re-use an in-memory List of what messages are available, to load 
that list no more than once per second, thereby significantly reduce # of queries 
while finding a message to process.
  
  Revision  Changes    Path
  1.7       +125 -72   
jakarta-james/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
  
  Index: JDBCSpoolRepository.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-james/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- JDBCSpoolRepository.java  2001/11/05 12:19:07     1.6
  +++ JDBCSpoolRepository.java  2001/12/03 22:58:50     1.7
  @@ -20,6 +20,7 @@
   import java.util.HashSet;
   import java.util.Iterator;
   import java.util.List;
  +import java.util.LinkedList;
   import java.util.Set;
   import java.util.StringTokenizer;
   import javax.mail.internet.MimeMessage;
  @@ -58,62 +59,128 @@
    * @author  Serge Knystautas <[EMAIL PROTECTED]>
    */
   public class JDBCSpoolRepository extends JDBCMailRepository implements 
SpoolRepository {
  -    private final static int WAIT_LIMIT = 60000;
   
  +    /**
  +     * How long a thread should sleep when there are no messages to process.
  +     */
  +    private static int WAIT_LIMIT = 60000;
  +    /**
  +     * How long we have to wait before reloading the list of pending messages
  +     */
  +    private static int LOAD_TIME_MININUM = 1000;
  +    /**
  +     * A queue in memory of messages that need processing
  +     */
  +    private LinkedList pendingMessages = new LinkedList();
  +    /**
  +     * When the queue was last read
  +     */
  +    private long pendingMessagesLoadTime = 0;
  +
  +    /**
  +     * Return the key of a message to process.  This is a message in the spool that 
is not locked.
  +     */
       public String accept() {
  -        System.err.println("accept called on " + this);
           while (true) {
  -            Connection conn = null;
  -            PreparedStatement listMessages = null;
  -            ResultSet rsListMessages = null;
  +            //Loop through until we are either out of pending messages or have a 
message
  +            // that we can lock
  +            PendingMessage next = null;
  +            while ((next = getNextPendingMessage()) != null) {
  +                if (lock(next.key)) {
  +                    return next.key;
  +                }
  +            }
  +            //Nothing to do... sleep!
               try {
  -                conn = getConnection();
  -                listMessages =
  -                    
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
  -                listMessages.setString(1, repositoryName);
  -                rsListMessages = listMessages.executeQuery();
  -
  -                while (rsListMessages.next()) {
  -                    String message = rsListMessages.getString(1);
  +                synchronized (this) {
  +                    //System.err.println("waiting : " + WAIT_LIMIT / 1000 + " in " 
+ repositoryName);
  +                    wait(WAIT_LIMIT);
  +                }
  +            } catch (InterruptedException ignored) {
  +            }
  +        }
  +    }
   
  -                    if (lock(message)) {
  -                        rsListMessages.close();
  -                        listMessages.close();
  -                        return message;
  +    /**
  +     * Return the key of a message that's ready to process.  If a message is of 
type "error"
  +     * then check the last updated time, and don't try it until the long 'delay' 
parameter
  +     * milliseconds has passed.
  +     */
  +    public synchronized String accept(long delay) {
  +        while (true) {
  +            //Loop through until we are either out of pending messages or have a 
message
  +            // that we can lock
  +            PendingMessage next = null;
  +            long sleepUntil = 0;
  +            while ((next = getNextPendingMessage()) != null) {
  +                //Check whether this is time to expire
  +                boolean shouldProcess = false;
  +                if (next.state.equals(Mail.ERROR)) {
  +                    //if it's an error message, test the time
  +                    long processingTime = delay + next.lastUpdated;
  +                    if (processingTime < System.currentTimeMillis()) {
  +                        //It's time to process
  +                        shouldProcess = true;
  +                    } else {
  +                        //We don't process this, but we want to possibly reduce the 
amount of time
  +                        //  we sleep so we wake when this message is ready.
  +                        if (sleepUntil == 0 || processingTime < sleepUntil) {
  +                            sleepUntil = processingTime;
  +                        }
                       }
  +                } else {
  +                    shouldProcess = true;
                   }
  -            } catch (Exception me) {
  -                me.printStackTrace();
  -                throw new RuntimeException("Exception while listing mail: " + 
me.getMessage());
  -            } finally {
  -                try {
  -                    rsListMessages.close();
  -                } catch (Exception e) {
  +                if (shouldProcess && lock(next.key)) {
  +                    return next.key;
                   }
  -                try {
  -                    listMessages.close();
  -                } catch (Exception e) {
  -                }
  -                try {
  -                    conn.close();
  -                } catch (Exception e) {
  -                }
               }
  +            //Nothing to do... sleep!
  +            if (sleepUntil == 0) {
  +                sleepUntil = System.currentTimeMillis() + WAIT_LIMIT;
  +            }
               try {
                   synchronized (this) {
  -                    wait(WAIT_LIMIT);
  +                    //System.err.println("waiting " + (sleepUntil - 
System.currentTimeMillis()) / 1000 + " in " + repositoryName);
  +                    wait(sleepUntil - System.currentTimeMillis());
                   }
               } catch (InterruptedException ignored) {
               }
  +
           }
       }
   
       /**
  -     * Find an available message, or an error one that hasn't been updated in a 
certain amount of time
  +     * If not empty, gets the next pending message.  Otherwise checks
  +     * checks the last time pending messages was loaded and load if
  +     * it's been more than 1 second (should be configurable).
        */
  -    public String accept(long delay) {
  -        while (true) {
  -            long next = 0;
  +    private PendingMessage getNextPendingMessage() {
  +        //System.err.println("Trying to get next message in " + repositoryName);
  +        synchronized (pendingMessages) {
  +            if (pendingMessages.size() == 0 && pendingMessagesLoadTime < 
System.currentTimeMillis()) {
  +                pendingMessagesLoadTime = LOAD_TIME_MININUM + 
System.currentTimeMillis();
  +                loadPendingMessages();
  +            }
  +
  +            if (pendingMessages.size() == 0) {
  +                return null;
  +            } else {
  +                //System.err.println("Returning a pending message in " + 
repositoryName);
  +                return (PendingMessage)pendingMessages.removeFirst();
  +            }
  +        }
  +    }
  +
  +    /**
  +     * Retrieves the pending messages that are in the database
  +     */
  +    private void loadPendingMessages() {
  +        //Loads a vector with PendingMessage objects
  +        //System.err.println("loading pending messages in " + repositoryName);
  +        synchronized (pendingMessages) {
  +            pendingMessages.clear();
  +
               Connection conn = null;
               PreparedStatement listMessages = null;
               ResultSet rsListMessages = null;
  @@ -123,33 +190,16 @@
                       
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
                   listMessages.setString(1, repositoryName);
                   rsListMessages = listMessages.executeQuery();
  -
                   while (rsListMessages.next()) {
  -                    String message = rsListMessages.getString(1);
  +                    String key = rsListMessages.getString(1);
                       String state = rsListMessages.getString(2);
  -                    boolean process = false;
  -                    if (state.equals(Mail.ERROR)) {
  -                        //Test the time
  -                        long timeToProcess = delay + 
rsListMessages.getTimestamp(3).getTime();
  -                        if (System.currentTimeMillis() > timeToProcess) {
  -                            process = true;
  -                        } else {
  -                            if (next == 0 || next > timeToProcess) {
  -                                //Mark this as the next most likely possible mail 
to process
  -                                next = timeToProcess;
  -                            }
  -                        }
  -                    } else {
  -                        process = true;
  -                    }
  -
  -                    if (process && lock(message)) {
  -                        return message;
  -                    }
  +                    long lastUpdated = rsListMessages.getTimestamp(3).getTime();
  +                    pendingMessages.add(new PendingMessage(key, state, 
lastUpdated));
                   }
  -            } catch (Exception me) {
  -                me.printStackTrace();
  -                throw new RuntimeException("Exception while listing mail: " + 
me.getMessage());
  +            } catch (SQLException sqle) {
  +                //Log it and avoid reloading for a bit
  +                getLogger().error("Error retrieving pending messages", sqle);
  +                pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 + 
System.currentTimeMillis();
               } finally {
                   try {
                       rsListMessages.close();
  @@ -164,18 +214,21 @@
                   } catch (Exception e) {
                   }
               }
  +        }
  +    }
   
  -            //We did not find any... let's wait for a certain amount of time
  -            try {
  -                synchronized (this) {
  -                    if (next == 0 || next - System.currentTimeMillis() > 
WAIT_LIMIT) {
  -                        wait(WAIT_LIMIT);
  -                    } else {
  -                        wait(next - System.currentTimeMillis());
  -                    }
  -                }
  -            } catch (InterruptedException ignored) {
  -            }
  +    /**
  +     * Simple class to hold basic information about a message in the spool
  +     */
  +    class PendingMessage {
  +        protected String key;
  +        protected String state;
  +        protected long lastUpdated;
  +
  +        public PendingMessage(String key, String state, long lastUpdated) {
  +            this.key = key;
  +            this.state = state;
  +            this.lastUpdated = lastUpdated;
           }
       }
   }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to