Hey everyone,

For anyone who cares and is trying to use Oracle to persist your jms messages 
with JBossMQ in jboss-3.2.5 we found a couple of problems in 
org.jboss.mq.pm.jdbc3.PersistenceManager when using the 
oracle-jdbc3-service.xml:

These aren't really problems with the PersistenceManager as everything works 
just fine under mysql for as much testing as we had done, but our requirement 
is to use Oracle for this case, and it seems that Oracle's bug is that it 
forces you to do a "select for update" when you're trying to retrieve Blobs 
greater than size 4k using the thin driver.  I'm not really sure why Oracle 
forces developers to write extra code in order to do this???  Suffice it to 
say... I'm calling it a bug! ;)

(1) We had problems storing messages of type ObjectMessage that were larger 
than 4k.

Granted you want to keep your ObjectMessage sizes short and concise, but we 
have some messages where the 4k limit just doesn't cut it for us.

(2) As far as I can tell the jms-1.1 spec allows null ObjectMessages (Section 
3.12 Provider Implementations of JMS Message Interfaces - pg. 51).  In this 
case if you try and store a null ObjectMessage on a queue, then the 
PersistenceManager.extractMessage(ResultSet, int) code throws a 
NullPointerException when the jms server starts up trying to peek ahead on the 
InputStream.

That said, we wrote some code extending PersistenceManager.java in order to get 
by these issues... we're still testing it, but if you're experiencing something 
similar the same might help you.  So in the spirit of open-source...


  | /*
  |  * JBossMQ, the OpenSource JMS implementation
  |  *
  |  * Distributable under LGPL license.
  |  * See terms of license at gnu.org.
  |  */
  | package org.jboss.mq.pm.jdbc3;
  | 
  | import java.io.ByteArrayInputStream;
  | import java.io.InputStream;
  | import java.io.IOException;
  | import java.io.ObjectInputStream;
  | import java.sql.Blob;
  | import java.sql.Connection;
  | import java.sql.PreparedStatement;
  | import java.sql.ResultSet;
  | import java.sql.SQLException;
  | import java.sql.Types;
  | 
  | import javax.jms.JMSException;
  | 
  | import org.jboss.mq.MessagePool;
  | import org.jboss.mq.SpyMessage;
  | import org.jboss.mq.pm.Tx;
  | import org.jboss.mq.pm.jdbc3.PersistenceManager;
  | 
  | 
  | /**
  |  * This class manages all persistence related services for JDBC based 
  |  *  persistence fixing an issue in regards to the jboss PersistenceManager 
for
  |  *  Oracle when inserting blobs larger than 4K.  Also fixes the issue of 
being
  |  *  able to store null ObjectMessages on the queue.
  |  * 
  |  * @author tradebeam.com
  |  */
  | public class OracleBlobPersistenceManager extends PersistenceManager {
  | 
  |     String INSERT_EMPTY_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID," 
+
  |             " DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES" +
  |             " (?,?,EMPTY_BLOB(),?,?,?)";
  |     String SELECT_BLOB_FOR_UPDATE = "SELECT MESSAGEID, MESSAGEBLOB FROM" +
  |             " JMS_MESSAGE_LOG WHERE messageid = ? FOR UPDATE";
  |     String UPDATE_BLOB = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB = ? WHERE" 
+
  |             " MESSAGEID = ?";
  |     
  |     /**
  |      * Create a new Persistence Manager.
  |      * 
  |      * @throws JMSException
  |      */
  |     public OracleBlobPersistenceManager () throws JMSException {
  |         super();
  |     }
  |     
  |     /**
  |      * Add a message.
  |      * 
  |      * @param c The connection.
  |      * @param queue The queue name.
  |      * @param message   the message.
  |      * @param txId  The transaction id.
  |      * @param mark  The mark to set for the message.
  |      * @param lateClone
  |      *  
  |      * @throws SQLException For an error in the db.
  |      * @throws IOException  For an error serializing the message.
  |      */
  |     protected void addMessage (Connection c, String queue, SpyMessage 
message,
  |                                Tx txId, String mark, String lateClone)
  |                               throws SQLException, IOException {
  |         PreparedStatement stmt = null;
  |         try {
  |             stmt = c.prepareStatement(INSERT_EMPTY_MESSAGE);
  |             stmt.setLong(1, message.header.messageId);
  |             
  |             String dest = "*";
  |             if (queue != null) {
  |                dest = queue;
  |             }
  |             stmt.setString(2, dest);
  |             // need to change the following for oracle in order to create 
the
  |             // blob initially as an EMPTY_BLOB()
  |             if (txId != null) {
  |                stmt.setLong(3, txId.longValue());
  |             } else {
  |                stmt.setNull(3, Types.BIGINT);
  |             }
  | 
  |             if (mark == null) {
  |                stmt.setNull(4, Types.VARCHAR);
  |             } else {
  |                stmt.setString(4, mark);
  |             }
  |             
  |             stmt.setString(5, lateClone);
  |             
  |             try {
  |                 stmt.executeUpdate();
  |                 // need to add the following for persisting in Oracle
  |                 stmt.close();
  |                 // the following is specifically required for blobs of size
  |                 // greater than 4k
  |                 stmt = c.prepareStatement(SELECT_BLOB_FOR_UPDATE);
  |                 stmt.setLong(1, message.header.messageId);
  |                 stmt.executeQuery();
  |                 stmt.close();
  |                 
  |                 stmt = c.prepareStatement(UPDATE_BLOB);
  |                 this.setBlob(stmt, 1, message);
  |                 stmt.setLong(2, message.header.messageId);
  |                 stmt.executeUpdate();
  |             } catch (SQLException sqle) {
  |                 if (lateClone.equals("1")) {
  |                     log.trace("Assumed already added to message log: " + 
  |                             message.header.messageId);
  |                 } else {
  |                     throw sqle;
  |                 }
  |             }    
  |         } finally {
  |             try {
  |                stmt.close();
  |             } catch (Throwable ignore) {
  |                log.warn("Problem while closing the PreparedStatement for" + 
  |                        " message: " + message.header.messageId);
  |             }
  |         }
  |     }
  |     
  |     /**
  |      * Extract a message from a result.  If the message is null then return 
a
  |      * default SpyMessage from the MessagePool.
  |      *
  |      * @param rs the result set
  |      * @param column the column number
  |      * @return the message
  |      * @throws SQLException for an error accessing the db
  |      * @throws IOException for an error extracting the message
  |      * @throws NullPointerException in the case when we receive unexpected 
data
  |      */
  |     protected SpyMessage extractMessage (ResultSet rs, int column)
  |                                                                       
throws SQLException, IOException {
  |        long messageid = 0;
  |        // this extra check should perserve the behavior of the super class 
so
  |        // that we ensure a NullPointerException is thrown here if we receive
  |        // bad data
  |        if (rs.getObject(1) != null) {
  |            messageid = rs.getLong(1);
  |        } else {
  |            log.error("ResultSet messageid column was null.");
  |            throw new NullPointerException("ResultSet messageid column was 
null.");
  |        }
  |        
  |        SpyMessage message = null;
  |        
  |        if (blobType == OBJECT_BLOB) {
  |            Object o = rs.getObject(column);
  |            if (o != null) {
  |                message = (SpyMessage) o;
  |            } else {
  |                // set the message to a default SpyMessage
  |                message = MessagePool.getMessage();
  |            }
  |        } else if (blobType == BYTES_BLOB) {
  |           byte[] st = rs.getBytes(column);
  |           ByteArrayInputStream baip = new ByteArrayInputStream(st);
  |           if (baip != null) {
  |               ObjectInputStream ois = new ObjectInputStream(baip);
  |               message = SpyMessage.readMessage(ois);
  |           } else {
  |               // set the message to a default SpyMessage
  |               message = MessagePool.getMessage();
  |           }
  |        } else if (blobType == BINARYSTREAM_BLOB) {
  |            InputStream in = rs.getBinaryStream(column);
  |            if (in != null) { 
  |                ObjectInputStream ois = new ObjectInputStream(in);
  |                message = SpyMessage.readMessage(ois);
  |            } else {
  |                // set the message to a default SpyMessage
  |                message = MessagePool.getMessage();
  |            }
  |        } else if (blobType == BLOB_BLOB) {
  |            Blob b = rs.getBlob(column);
  |            if (b != null) {
  |                ObjectInputStream ois = new 
ObjectInputStream(b.getBinaryStream());
  |                message = SpyMessage.readMessage(ois);
  |            } else {
  |                // set the message to a default SpyMessage
  |                message = MessagePool.getMessage();
  |            }
  |        }
  |        
  |        message.header.messageId = messageid;
  |        return message;
  |     }
  | }
  | 

Cheers,
-Jason

View the original post : 
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=3860700#3860700

Reply to the post : 
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=3860700


-------------------------------------------------------
The SF.Net email is sponsored by: Beat the post-holiday blues
Get a FREE limited edition SourceForge.net t-shirt from ThinkGeek.
It's fun and FREE -- well, almost....http://www.thinkgeek.com/sfshirt
_______________________________________________
JBoss-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/jboss-user

Reply via email to