serge 01/12/03 14:58:50
Modified: src/java/org/apache/james/mailrepository
JDBCSpoolRepository.java
Log:
Big overhaul to re-use an in-memory List of what messages are available, to load
that list no more than once per second, thereby significantly reduce # of queries
while finding a message to process.
Revision Changes Path
1.7 +125 -72
jakarta-james/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
Index: JDBCSpoolRepository.java
===================================================================
RCS file:
/home/cvs/jakarta-james/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- JDBCSpoolRepository.java 2001/11/05 12:19:07 1.6
+++ JDBCSpoolRepository.java 2001/12/03 22:58:50 1.7
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.LinkedList;
import java.util.Set;
import java.util.StringTokenizer;
import javax.mail.internet.MimeMessage;
@@ -58,62 +59,128 @@
* @author Serge Knystautas <[EMAIL PROTECTED]>
*/
public class JDBCSpoolRepository extends JDBCMailRepository implements
SpoolRepository {
- private final static int WAIT_LIMIT = 60000;
+ /**
+ * How long a thread should sleep when there are no messages to process.
+ */
+ private static int WAIT_LIMIT = 60000;
+ /**
+ * How long we have to wait before reloading the list of pending messages
+ */
+ private static int LOAD_TIME_MININUM = 1000;
+ /**
+ * A queue in memory of messages that need processing
+ */
+ private LinkedList pendingMessages = new LinkedList();
+ /**
+ * When the queue was last read
+ */
+ private long pendingMessagesLoadTime = 0;
+
+ /**
+ * Return the key of a message to process. This is a message in the spool that
is not locked.
+ */
public String accept() {
- System.err.println("accept called on " + this);
while (true) {
- Connection conn = null;
- PreparedStatement listMessages = null;
- ResultSet rsListMessages = null;
+ //Loop through until we are either out of pending messages or have a
message
+ // that we can lock
+ PendingMessage next = null;
+ while ((next = getNextPendingMessage()) != null) {
+ if (lock(next.key)) {
+ return next.key;
+ }
+ }
+ //Nothing to do... sleep!
try {
- conn = getConnection();
- listMessages =
-
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
- listMessages.setString(1, repositoryName);
- rsListMessages = listMessages.executeQuery();
-
- while (rsListMessages.next()) {
- String message = rsListMessages.getString(1);
+ synchronized (this) {
+ //System.err.println("waiting : " + WAIT_LIMIT / 1000 + " in "
+ repositoryName);
+ wait(WAIT_LIMIT);
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
- if (lock(message)) {
- rsListMessages.close();
- listMessages.close();
- return message;
+ /**
+ * Return the key of a message that's ready to process. If a message is of
type "error"
+ * then check the last updated time, and don't try it until the long 'delay'
parameter
+ * milliseconds has passed.
+ */
+ public synchronized String accept(long delay) {
+ while (true) {
+ //Loop through until we are either out of pending messages or have a
message
+ // that we can lock
+ PendingMessage next = null;
+ long sleepUntil = 0;
+ while ((next = getNextPendingMessage()) != null) {
+ //Check whether this is time to expire
+ boolean shouldProcess = false;
+ if (next.state.equals(Mail.ERROR)) {
+ //if it's an error message, test the time
+ long processingTime = delay + next.lastUpdated;
+ if (processingTime < System.currentTimeMillis()) {
+ //It's time to process
+ shouldProcess = true;
+ } else {
+ //We don't process this, but we want to possibly reduce the
amount of time
+ // we sleep so we wake when this message is ready.
+ if (sleepUntil == 0 || processingTime < sleepUntil) {
+ sleepUntil = processingTime;
+ }
}
+ } else {
+ shouldProcess = true;
}
- } catch (Exception me) {
- me.printStackTrace();
- throw new RuntimeException("Exception while listing mail: " +
me.getMessage());
- } finally {
- try {
- rsListMessages.close();
- } catch (Exception e) {
+ if (shouldProcess && lock(next.key)) {
+ return next.key;
}
- try {
- listMessages.close();
- } catch (Exception e) {
- }
- try {
- conn.close();
- } catch (Exception e) {
- }
}
+ //Nothing to do... sleep!
+ if (sleepUntil == 0) {
+ sleepUntil = System.currentTimeMillis() + WAIT_LIMIT;
+ }
try {
synchronized (this) {
- wait(WAIT_LIMIT);
+ //System.err.println("waiting " + (sleepUntil -
System.currentTimeMillis()) / 1000 + " in " + repositoryName);
+ wait(sleepUntil - System.currentTimeMillis());
}
} catch (InterruptedException ignored) {
}
+
}
}
/**
- * Find an available message, or an error one that hasn't been updated in a
certain amount of time
+ * If not empty, gets the next pending message. Otherwise checks
+ * checks the last time pending messages was loaded and load if
+ * it's been more than 1 second (should be configurable).
*/
- public String accept(long delay) {
- while (true) {
- long next = 0;
+ private PendingMessage getNextPendingMessage() {
+ //System.err.println("Trying to get next message in " + repositoryName);
+ synchronized (pendingMessages) {
+ if (pendingMessages.size() == 0 && pendingMessagesLoadTime <
System.currentTimeMillis()) {
+ pendingMessagesLoadTime = LOAD_TIME_MININUM +
System.currentTimeMillis();
+ loadPendingMessages();
+ }
+
+ if (pendingMessages.size() == 0) {
+ return null;
+ } else {
+ //System.err.println("Returning a pending message in " +
repositoryName);
+ return (PendingMessage)pendingMessages.removeFirst();
+ }
+ }
+ }
+
+ /**
+ * Retrieves the pending messages that are in the database
+ */
+ private void loadPendingMessages() {
+ //Loads a vector with PendingMessage objects
+ //System.err.println("loading pending messages in " + repositoryName);
+ synchronized (pendingMessages) {
+ pendingMessages.clear();
+
Connection conn = null;
PreparedStatement listMessages = null;
ResultSet rsListMessages = null;
@@ -123,33 +190,16 @@
conn.prepareStatement(sqlQueries.getSqlString("listMessagesSQL", true));
listMessages.setString(1, repositoryName);
rsListMessages = listMessages.executeQuery();
-
while (rsListMessages.next()) {
- String message = rsListMessages.getString(1);
+ String key = rsListMessages.getString(1);
String state = rsListMessages.getString(2);
- boolean process = false;
- if (state.equals(Mail.ERROR)) {
- //Test the time
- long timeToProcess = delay +
rsListMessages.getTimestamp(3).getTime();
- if (System.currentTimeMillis() > timeToProcess) {
- process = true;
- } else {
- if (next == 0 || next > timeToProcess) {
- //Mark this as the next most likely possible mail
to process
- next = timeToProcess;
- }
- }
- } else {
- process = true;
- }
-
- if (process && lock(message)) {
- return message;
- }
+ long lastUpdated = rsListMessages.getTimestamp(3).getTime();
+ pendingMessages.add(new PendingMessage(key, state,
lastUpdated));
}
- } catch (Exception me) {
- me.printStackTrace();
- throw new RuntimeException("Exception while listing mail: " +
me.getMessage());
+ } catch (SQLException sqle) {
+ //Log it and avoid reloading for a bit
+ getLogger().error("Error retrieving pending messages", sqle);
+ pendingMessagesLoadTime = LOAD_TIME_MININUM * 10 +
System.currentTimeMillis();
} finally {
try {
rsListMessages.close();
@@ -164,18 +214,21 @@
} catch (Exception e) {
}
}
+ }
+ }
- //We did not find any... let's wait for a certain amount of time
- try {
- synchronized (this) {
- if (next == 0 || next - System.currentTimeMillis() >
WAIT_LIMIT) {
- wait(WAIT_LIMIT);
- } else {
- wait(next - System.currentTimeMillis());
- }
- }
- } catch (InterruptedException ignored) {
- }
+ /**
+ * Simple class to hold basic information about a message in the spool
+ */
+ class PendingMessage {
+ protected String key;
+ protected String state;
+ protected long lastUpdated;
+
+ public PendingMessage(String key, String state, long lastUpdated) {
+ this.key = key;
+ this.state = state;
+ this.lastUpdated = lastUpdated;
}
}
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>