Author: rajith
Date: Fri Jun 15 17:23:25 2012
New Revision: 1350710
URL: http://svn.apache.org/viewvc?rev=1350710&view=rev
Log:
QPID-4027 Tied in the message implementation into the cpp/jni
implementation. A bit more cleanup and testing needs to be done.
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java?rev=1350710&r1=1350709&r2=1350710&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
Fri Jun 15 17:23:25 2012
@@ -18,6 +18,7 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Connection;
+import org.apache.qpid.messaging.MessageFactory;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
@@ -29,6 +30,8 @@ import org.apache.qpid.messaging.Session
*/
public class CppConnection implements Connection
{
+ private static MessageFactory _MSG_FACTORY = new CppMessageFactory();
+
private org.apache.qpid.messaging.cpp.jni.Connection _cppConn;
public CppConnection(String url)
@@ -78,4 +81,10 @@ public class CppConnection implements Co
{
return _cppConn.getAuthenticatedUsername();
}
+
+ @Override
+ public MessageFactory getMessageFactory()
+ {
+ return _MSG_FACTORY;
+ }
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java?rev=1350710&r1=1350709&r2=1350710&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
Fri Jun 15 17:23:25 2012
@@ -26,27 +26,28 @@ public class CppReceiver implements Rece
{
private CppSession _ssn;
private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
+ private CppMessageFactory _msgFactory;
public CppReceiver(CppSession ssn,
- org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
+ org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) throws
MessagingException
{
_ssn = ssn;
_cppReceiver = cppReceiver;
+ _msgFactory =
(CppMessageFactory)ssn.getConnection().getMessageFactory();
}
@Override
public Message get(long timeout) throws MessagingException
{
org.apache.qpid.messaging.cpp.jni.Message m =
_cppReceiver.get(CppDuration.getDuration(timeout));
- return new TextMessage(m.getContent());
-
+ return _msgFactory.createMessage(m);
}
@Override
public Message fetch(long timeout) throws MessagingException
{
org.apache.qpid.messaging.cpp.jni.Message m =
_cppReceiver.fetch(CppDuration.getDuration(timeout));
- return new TextMessage(m);
+ return _msgFactory.createMessage(m);
}
@Override
@@ -104,5 +105,4 @@ public class CppReceiver implements Rece
_ssn.checkError();
return _ssn;
}
-
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java?rev=1350710&r1=1350709&r2=1350710&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
Fri Jun 15 17:23:25 2012
@@ -21,23 +21,45 @@ import org.apache.qpid.messaging.Message
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.ext.MessageInternal;
public class CppSender implements Sender
{
private CppSession _ssn;
private org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
+ private CppMessageFactory _msgFactory;
public CppSender(CppSession ssn,
- org.apache.qpid.messaging.cpp.jni.Sender cppSender)
+ org.apache.qpid.messaging.cpp.jni.Sender cppSender) throws
MessagingException
{
_ssn = ssn;
_cppSender = cppSender;
+ _msgFactory =
(CppMessageFactory)ssn.getConnection().getMessageFactory();
}
@Override
public void send(Message message, boolean sync) throws MessagingException
{
- _cppSender.send(((TextMessage)message).getCppMessage(),true);
+ org.apache.qpid.messaging.cpp.jni.Message m =
convertForSending(message);
+ _cppSender.send(m,true);
+ }
+
+ private org.apache.qpid.messaging.cpp.jni.Message
convertForSending(Message m) throws MessagingException
+ {
+ if((m instanceof MessageInternal) &&
+ (_msgFactory.getClass() ==
((MessageInternal)m).getMessageFactoryClass())
+ )
+ {
+ org.apache.qpid.messaging.cpp.jni.Message msg =
+
(org.apache.qpid.messaging.cpp.jni.Message)((MessageInternal)m).getFactorySpecificMessageDelegate();
+ msg.setContentAsByteBuffer(m.getContent());
+ return msg;
+ }
+ else
+ {
+ throw new MessagingException("Incompatible message
implementation." +
+ "You need to use the MessageFactory given by the
connection that owns this ");
+ }
}
@Override
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java?rev=1350710&r1=1350709&r2=1350710&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
Fri Jun 15 17:23:25 2012
@@ -30,7 +30,7 @@ public class CppTest
{
public static void main(String[] args) throws Exception
{
- Connection con =
ConnectionFactory.get().createConnection("localhost:5672");
+ /*Connection con =
ConnectionFactory.get().createConnection("localhost:5672");
con.open();
Session ssn = con.createSession("hello");
System.out.println("Got a session object " + ssn);
@@ -64,7 +64,7 @@ public class CppTest
System.out.println("Msg toString() : " + m);
ssn.close();
- con.close();
+ con.close();*/
}
}
Modified:
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
URL:
http://svn.apache.org/viewvc/qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java?rev=1350710&r1=1350709&r2=1350710&view=diff
==============================================================================
---
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
(original)
+++
qpid/branches/address-refactor2/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
Fri Jun 15 17:23:25 2012
@@ -180,8 +180,9 @@ public class ConnectionManagementDecorat
}
@Override
- public MessageFactory getMessageFactory()
+ public MessageFactory getMessageFactory() throws MessagingException
{
+ checkClosedAndThrowException();
return _delegate.getMessageFactory();
}
@@ -254,6 +255,12 @@ public class ConnectionManagementDecorat
return _connectionLock;
}
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+ }
+
private void checkClosedAndThrowException() throws ConnectionException
{
checkClosedAndThrowException("Connection is closed. You cannot invoke
methods on a closed connection");
@@ -290,11 +297,4 @@ public class ConnectionManagementDecorat
// TODO add local IP and pid to the beginning;
return _ssnNameGenerator.generate().toString();
}
-
- @Override
- public void recreate() throws MessagingException
- {
- // TODO Auto-generated method stub
-
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]