serge 01/07/31 20:49:23
Modified: proposals/noparse-mimemessage/java/org/apache/james/mailrepository
MimeMessageJDBCSource.java JDBCSpoolRepository.java
JDBCMailRepository.java
Log:
Changed to use external configuration file (defined by the repository destination).
This is a properties file with JDBC connection information, SQL statements, table
name, repository name, and an optional filestore parameter. This can store the entire
message in a database, or it can store the headers in a database and the body in the
file system.
Revision Changes Path
1.3 +31 -31
jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/MimeMessageJDBCSource.java
Index: MimeMessageJDBCSource.java
===================================================================
RCS file:
/home/cvs/jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/MimeMessageJDBCSource.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MimeMessageJDBCSource.java 2001/07/01 12:58:15 1.2
+++ MimeMessageJDBCSource.java 2001/08/01 03:49:23 1.3
@@ -10,6 +10,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.SequenceInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -17,24 +18,27 @@
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.james.core.MimeMessageSource;
+import org.apache.avalon.cornerstone.services.store.StreamRepository;
-public class MimeMessageJDBCSource
- extends MimeMessageSource {
+/**
+ * This class points to a specific message in a repository. This will return an
+ * InputStream to the JDBC field/record, possibly sequenced with the file stream.
+ */
+public class MimeMessageJDBCSource extends MimeMessageSource {
- private String retrieveMessageStreamSQL;
-
//Define how to get to the data
JDBCMailRepository repository = null;
String key = null;
+ StreamRepository sr = null;
- //The inputstream, if closed, is null, if open contains appropriate references
- InputStream in;
- Statement inStatement;
- ResultSet inResultSet;
- Connection conn;
+ String retrieveMessageBodySQL = null;
+ /**
+ * Construct a MimeMessageSource based on a JDBC repository, a key, and a
+ * stream repository (where we might store the message body)
+ */
public MimeMessageJDBCSource(JDBCMailRepository repository,
- String key) throws IOException {
+ String key, StreamRepository sr) throws IOException {
if (repository == null) {
throw new IOException("Repository is null");
}
@@ -43,18 +47,23 @@
}
this.repository = repository;
this.key = key;
+ this.sr = sr;
- retrieveMessageStreamSQL = "SELECT message_body FROM " +
repository.tableName
- + " WHERE message_name = ? AND repository_name = ?";
+ retrieveMessageBodySQL =
repository.sqlQueries.getProperty("retrieveMessageBodySQL");
}
+ /**
+ * Return the input stream to the database field and then the file stream.
This should
+ * be smart enough to work even if the file does not exist. This is to support
+ * a repository with the entire message in the database, which is how James 1.2
worked.
+ */
public synchronized InputStream getInputStream() throws IOException {
//System.err.println("loading data for " + key + "/" + repository);
try {
- conn = repository.getConnection();
+ Connection conn = repository.getConnection();
- PreparedStatement retrieveMessageStream =
conn.prepareStatement(retrieveMessageStreamSQL);
+ PreparedStatement retrieveMessageStream =
conn.prepareStatement(retrieveMessageBodySQL);
retrieveMessageStream.setString(1, key);
retrieveMessageStream.setString(2, repository.repositoryName);
ResultSet rsRetrieveMessageStream =
retrieveMessageStream.executeQuery();
@@ -63,29 +72,20 @@
throw new IOException("Could not find message");
}
- in = rsRetrieveMessageStream.getBinaryStream(1);
- inResultSet = rsRetrieveMessageStream;
- inStatement = retrieveMessageStream;
+ byte[] headers = rsRetrieveMessageStream.getBytes(1);
+ InputStream in = new ByteArrayInputStream(headers);
+ if (sr != null) {
+ in = new SequenceInputStream(in, sr.get(key));
+ }
return in;
} catch (SQLException sqle) {
throw new IOException(sqle.toString());
- } finally {
- //Do we really want to do this? I think not
- /*
- try {
- conn.close();
- } catch (Exception e) {
- }
- */
}
}
- /*
- public synchronized long getSize() throws IOException {
- //Would like to implement using BLOBs
-
- }
- */
+ /**
+ * Check to see whether this is the same repository and the same key
+ */
public boolean equals(Object obj) {
if (obj instanceof MimeMessageJDBCSource) {
MimeMessageJDBCSource source = (MimeMessageJDBCSource)obj;
1.3 +3 -2
jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
Index: JDBCSpoolRepository.java
===================================================================
RCS file:
/home/cvs/jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/JDBCSpoolRepository.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JDBCSpoolRepository.java 2001/07/01 12:58:15 1.2
+++ JDBCSpoolRepository.java 2001/08/01 03:49:23 1.3
@@ -65,8 +65,9 @@
public synchronized String accept() {
while (true) {
try {
+ //System.err.println("querying db");
Connection conn = getConnection();
- PreparedStatement listMessages =
conn.prepareStatement(listMessagesSQL);
+ PreparedStatement listMessages =
conn.prepareStatement(sqlQueries.getProperty("listMessagesSQL"));
listMessages.setString(1, repositoryName);
ResultSet rsListMessages = listMessages.executeQuery();
@@ -99,7 +100,7 @@
long next = 0;
try {
Connection conn = getConnection();
- PreparedStatement listMessages =
conn.prepareStatement(listMessagesSQL);
+ PreparedStatement listMessages =
conn.prepareStatement(sqlQueries.getProperty("listMessagesSQL"));
listMessages.setString(1, repositoryName);
ResultSet rsListMessages = listMessages.executeQuery();
1.3 +116 -102
jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/JDBCMailRepository.java
Index: JDBCMailRepository.java
===================================================================
RCS file:
/home/cvs/jakarta-james/proposals/noparse-mimemessage/java/org/apache/james/mailrepository/JDBCMailRepository.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JDBCMailRepository.java 2001/07/01 12:58:15 1.2
+++ JDBCMailRepository.java 2001/08/01 03:49:23 1.3
@@ -13,16 +13,23 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
import java.io.InputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
import javax.mail.internet.MimeMessage;
+import org.apache.avalon.cornerstone.services.store.Store;
+import org.apache.avalon.cornerstone.services.store.StreamRepository;
import org.apache.avalon.framework.component.Component;
import org.apache.avalon.framework.component.Composable;
import org.apache.avalon.framework.component.ComponentManager;
@@ -66,100 +73,83 @@
protected Lock lock;
protected String destination;
- protected String tableName = "EML_Spool";
+ protected String tableName;
protected String repositoryName;
+ private StreamRepository sr = null;
+
//The table where this is stored
- private String driverClassName = "com.inet.tds.TdsDriver";
- protected String jdbcURL = "jdbc:inetdae7:127.0.0.1?database=James";
- protected String jdbcUsername = "sa_james"; //optional
- protected String jdbcPassword = "blahblah"; //optional
-
-
- protected String checkMessageExistsSQL =
- "SELECT count(*) FROM " + tableName + " WHERE message_name = ? AND
repository_name = ?";
-
- protected String updateMessageSQL =
- "UPDATE " + tableName + " SET message_state = ?, error_message = ?,
sender = ?, recipients = ?, "
- + "remote_host = ?, remote_addr = ?, last_updated = ? "
- + "WHERE message_name = ? AND repository_name = ?";
-
- protected String updateMessageBodySQL =
- "UPDATE " + tableName + " SET message_body = ? WHERE message_name = ?
AND repository_name = ?";
-
- protected String insertMessageSQL =
- "INSERT INTO " + tableName + " (message_name, repository_name,
message_state, "
- + "error_message, sender, recipients, remote_host, remote_addr,
last_updated, message_body) "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- protected String retrieveMessageSQL =
- "SELECT message_state, error_message, sender, recipients, remote_host,
remote_addr, last_updated "
- + "FROM " + tableName + " WHERE message_name = ? AND repository_name =
?";
-
- protected String removeMessageSQL =
- "DELETE FROM " + tableName + " WHERE message_name = ? AND
repository_name = ?";
-
- protected String listMessagesSQL =
- "SELECT message_name, message_state, last_updated FROM " + tableName
- + " WHERE repository_name = ? ORDER BY last_updated ASC";
+ private String driverClassName;
+ protected String jdbcURL;
+ protected String jdbcUsername; //optional
+ protected String jdbcPassword; //optional
+
+ protected Properties sqlQueries = null;
public void configure(Configuration conf) throws ConfigurationException {
destination = conf.getAttribute("destinationURL");
- destination = destination.substring(destination.indexOf("//") + 2);
- tableName = destination.substring(0, destination.indexOf("/"));
- repositoryName = destination.substring(destination.indexOf("/") + 1);
- /*
- String checkType = conf.getAttribute("type");
- if (! (checkType.equals("MAIL") || checkType.equals("SPOOL")) ) {
- final String message =
- "Attempt to configure TownSpoolRepository as " + checkType;
- getLogger().warn( message );
- throw new ConfigurationException( message );
- }
- // ignore model
- conndefinition = conf.getChild("conn").getValue();
- tableName = conf.getChild("table").getValue();
- */
- try {
- Class.forName(driverClassName);
- } catch (ClassNotFoundException cnfe) {
- String message = "Unable to load JDBC driver: " + driverClassName;
- getLogger().error(message);
- throw new ConfigurationException(message);
- }
}
public void compose( final ComponentManager componentManager )
throws ComponentException {
try {
- //store = (Store)componentManager.
- // lookup( "org.apache.avalon.cornerstone.services.store.Store" );
+ Properties props = new Properties();
+ InputStream in = new FileInputStream(destination.substring(5));
+ props.load(in);
+ in.close();
+
+ driverClassName = props.getProperty("driver");
+ jdbcURL = props.getProperty("URL");
+ jdbcUsername = props.getProperty("username"); //optional
+ jdbcPassword = props.getProperty("password"); //optional
+
+ Class.forName(driverClassName);
+
+ tableName = props.getProperty("table");
+ repositoryName = props.getProperty("repository");
+
+ //Loop through and replace <table> with the actual table name in each
case
+ sqlQueries = new Properties();
+ for (Enumeration e = props.keys(); e.hasMoreElements(); ) {
+ String key = (String)e.nextElement();
+ if (!(key.endsWith("SQL"))) {
+ continue;
+ }
+ String query = props.getProperty(key);
+ int i = query.indexOf("<table>");
+ if (i > -1) {
+ query = query.substring(0, i) + tableName + query.substring(i +
7);
+ }
+ //System.err.println(query);
+ sqlQueries.put(key, query);
+ }
+
+
+ String filestore = props.getProperty("filestore");
- //prepare Configurations for object and stream repositories
- DefaultConfiguration objectConfiguration
- = new DefaultConfiguration( "repository",
-
"generated:JDBCMailRepository.compose()" );
-
- objectConfiguration.setAttribute("destinationURL", destination);
- objectConfiguration.setAttribute("type", "OBJECT");
- objectConfiguration.setAttribute("model", "SYNCHRONOUS");
-
- DefaultConfiguration streamConfiguration
- = new DefaultConfiguration( "repository",
-
"generated:JDBCMailRepository.compose()" );
-
- streamConfiguration.setAttribute( "destinationURL", destination );
- streamConfiguration.setAttribute( "type", "STREAM" );
- streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
+ if (filestore != null) {
- //sr = (StreamRepository) store.select(streamConfiguration);
- //or = (ObjectRepository) store.select(objectConfiguration);
+ Store store = (Store)componentManager.
+
lookup("org.apache.avalon.cornerstone.services.store.Store");
+
+ //prepare Configurations for stream repositories
+ DefaultConfiguration streamConfiguration
+ = new DefaultConfiguration( "repository",
+
"generated:JDBCMailRepository.compose()" );
+
+ streamConfiguration.setAttribute( "destinationURL", filestore );
+ streamConfiguration.setAttribute( "type", "STREAM" );
+ streamConfiguration.setAttribute( "model", "SYNCHRONOUS" );
+ sr = (StreamRepository) store.select(streamConfiguration);
+ }
+
lock = new Lock();
- getLogger().debug(this.getClass().getName() + " created in " + destination);
+ getLogger().debug(this.getClass().getName() + " created according to "
+ destination);
} catch (Exception e) {
final String message = "Failed to retrieve Store component:" +
e.getMessage();
- getLogger().error( message, e );
- throw new ComponentException( message, e );
+ getLogger().error(message, e);
+ e.printStackTrace();
+ throw new ComponentException(message, e);
}
}
@@ -189,7 +179,7 @@
//Begin a transaction
conn.setAutoCommit(false);
- PreparedStatement checkMessageExists =
conn.prepareStatement(checkMessageExistsSQL);
+ PreparedStatement checkMessageExists =
conn.prepareStatement(sqlQueries.getProperty("checkMessageExistsSQL"));
checkMessageExists.setString(1, mc.getName());
checkMessageExists.setString(2, repositoryName);
ResultSet rsExists = checkMessageExists.executeQuery();
@@ -199,7 +189,7 @@
if (exists) {
//Update the existing record
- PreparedStatement updateMessage =
conn.prepareStatement(updateMessageSQL);
+ PreparedStatement updateMessage =
conn.prepareStatement(sqlQueries.getProperty("updateMessageSQL"));
updateMessage.setString(1, mc.getState());
updateMessage.setString(2, mc.getErrorMessage());
updateMessage.setString(3, mc.getSender().toString());
@@ -231,10 +221,24 @@
}
if (saveBody) {
- updateMessage = conn.prepareStatement(updateMessageBodySQL);
- int size = (int)messageBody.getSize();
- updateMessage.setBinaryStream(1, messageBody.getInputStream(),
size);
- //updateMessage.setBinaryStream(1, new
DebugInputStream(messageBody.getInputStream()), size);
+ updateMessage =
conn.prepareStatement(sqlQueries.getProperty("updateMessageBodySQL"));
+ ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
+ OutputStream bodyOut = null;
+ if (sr == null) {
+ //If there is no filestore, use the byte array to store
headers
+ // and the body
+ bodyOut = headerOut;
+ } else {
+ //Store the body in the stream repository
+ bodyOut = sr.put(mc.getName());
+ }
+
+ //Write the message to the headerOut and bodyOut. bodyOut goes
straight to the file
+ MimeMessageWrapper.writeTo(messageBody, headerOut, bodyOut);
+ bodyOut.close();
+
+ //Store the headers in the database
+ updateMessage.setBytes(1, headerOut.toByteArray());
updateMessage.setString(2, mc.getName());
updateMessage.setString(3, repositoryName);
updateMessage.execute();
@@ -242,7 +246,7 @@
}
} else {
//Insert the record into the database
- PreparedStatement insertMessage =
conn.prepareStatement(insertMessageSQL);
+ PreparedStatement insertMessage =
conn.prepareStatement(sqlQueries.getProperty("insertMessageSQL"));
insertMessage.setString(1, mc.getName());
insertMessage.setString(2, repositoryName);
insertMessage.setString(3, mc.getState());
@@ -259,13 +263,26 @@
insertMessage.setString(7, mc.getRemoteHost());
insertMessage.setString(8, mc.getRemoteAddr());
java.sql.Date lastUpdated = new
java.sql.Date(mc.getLastUpdated().getTime());
- //System.err.println(lastUpdated);
insertMessage.setDate(9, lastUpdated);
- //insertMessage.setString(9, sqlFormat.format(mc.getLastUpdated()));
MimeMessage messageBody = mc.getMessage();
- int size = messageBody.getSize();
- insertMessage.setBinaryStream(10, messageBody.getInputStream(),
size);
- //insertMessage.setBinaryStream(10, new
DebugInputStream(messageBody.getInputStream()), size);
+
+ ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
+ OutputStream bodyOut = null;
+ if (sr == null) {
+ //If there is no sr, then use the same byte array to hold the
headers
+ // and the body
+ bodyOut = headerOut;
+ } else {
+ //Store the body in the file system.
+ bodyOut = sr.put(mc.getName());
+ }
+
+ //Write the message to the headerOut and bodyOut. bodyOut goes
straight to the file
+ MimeMessageWrapper.writeTo(messageBody, headerOut, bodyOut);
+ bodyOut.close();
+
+ //Store the headers in the database
+ insertMessage.setBytes(10, headerOut.toByteArray());
insertMessage.execute();
insertMessage.close();
}
@@ -288,7 +305,7 @@
try {
Connection conn = getConnection();
- PreparedStatement retrieveMessage =
conn.prepareStatement(retrieveMessageSQL);
+ PreparedStatement retrieveMessage =
conn.prepareStatement(sqlQueries.getProperty("retrieveMessageSQL"));
retrieveMessage.setString(1, key);
retrieveMessage.setString(2, repositoryName);
ResultSet rsMessage = retrieveMessage.executeQuery();
@@ -310,11 +327,7 @@
mc.setRemoteAddr(rsMessage.getString(6));
mc.setLastUpdated(new java.util.Date(rsMessage.getDate(7).getTime()));
- //Create a reference to a JDBCMimeMessageInputStream
- //InputStream in = new JDBCMimeMessageInputStream(this, key);
- //InputStream in = new TownMimeMessageInputStream(conndefinition,
tableName, key, repositoryName);
- //InputStream in = new
ByteArrayInputStream(message.getAsBytes("message_body"));
- MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key);
+ MimeMessageJDBCSource source = new MimeMessageJDBCSource(this, key, sr);
MimeMessageWrapper message = new MimeMessageWrapper(source);
mc.setMessage(message);
rsMessage.close();
@@ -345,12 +358,16 @@
lock(key);
Connection conn = getConnection();
- PreparedStatement removeMessage =
conn.prepareStatement(removeMessageSQL);
+ PreparedStatement removeMessage =
conn.prepareStatement(sqlQueries.getProperty("removeMessageSQL"));
removeMessage.setString(1, key);
removeMessage.setString(2, repositoryName);
removeMessage.execute();
removeMessage.close();
conn.close();
+
+ if (sr != null) {
+ sr.remove(key);
+ }
} catch (Exception me) {
throw new RuntimeException("Exception while removing mail: " +
me.getMessage());
} finally {
@@ -362,7 +379,7 @@
//System.err.println("listing messages");
try {
Connection conn = getConnection();
- PreparedStatement listMessages = conn.prepareStatement(listMessagesSQL);
+ PreparedStatement listMessages =
conn.prepareStatement(sqlQueries.getProperty("listMessagesSQL"));
listMessages.setString(1, repositoryName);
ResultSet rsListMessages = listMessages.executeQuery();
@@ -375,14 +392,11 @@
conn.close();
return messageList.iterator();
} catch (Exception me) {
- me.printStackTrace();
+ me.printStackTrace();
throw new RuntimeException("Exception while listing mail: " +
me.getMessage());
}
}
- //
- // Private methods
- //
/**
* Opens a database connection.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]