Author: rajdavies
Date: Fri Mar 2 12:00:11 2007
New Revision: 513921
URL: http://svn.apache.org/viewvc?view=rev&rev=513921
Log:
Fixed Queue cursor test case for AMQ Store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Fri Mar 2 12:00:11 2007
@@ -68,8 +68,9 @@
protected HashSet<Location> inFlightTxLocations=new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
+ private final boolean debug=log.isDebugEnabled();
private final AtomicReference<Location> mark=new
AtomicReference<Location>();
-
+
public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore
referenceStore,ActiveMQDestination destination){
this.peristenceAdapter=adapter;
this.transactionStore=adapter.getTransactionStore();
@@ -95,7 +96,7 @@
*/
public void addMessage(ConnectionContext context,final Message message)
throws IOException{
final MessageId id=message.getMessageId();
- final boolean debug=log.isDebugEnabled();
+
final Location
location=peristenceAdapter.writeCommand(message,message.isResponseRequired());
if(!context.isInTransaction()){
if(debug)
@@ -168,7 +169,6 @@
/**
*/
public void removeMessage(ConnectionContext context,final MessageAck ack)
throws IOException{
- final boolean debug=log.isDebugEnabled();
JournalQueueAck remove=new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
@@ -450,6 +450,7 @@
}
public void recoverNextMessages(int maxReturned,MessageRecoveryListener
listener) throws Exception{
+ /*
RecoveryListenerAdapter recoveryListener=new
RecoveryListenerAdapter(this,listener);
if(referenceStore.supportsExternalBatchControl()){
synchronized(this){
@@ -469,6 +470,13 @@
}
}
}else{
+ flush();
+ referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+ }
+ */
+ RecoveryListenerAdapter recoveryListener=new
RecoveryListenerAdapter(this,listener);
+ referenceStore.recoverNextMessages(maxReturned,recoveryListener);
+ if(recoveryListener.size()==0&&recoveryListener.hasSpace()){
flush();
referenceStore.recoverNextMessages(maxReturned,recoveryListener);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
Fri Mar 2 12:00:11 2007
@@ -51,9 +51,7 @@
public void recoverMessageReference(MessageId ref) throws Exception{
Message message=this.store.getMessage(ref);
if(message!=null){
- listener.recoverMessage(message);
- count++;
- lastRecovered=ref;
+ recoverMessage(message);
}else{
log.error("Message id "+ref+" could not be recovered from the data
store!");
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Fri Mar 2 12:00:11 2007
@@ -32,7 +32,7 @@
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
- protected StoreEntry batchEntry=null;
+ private StoreEntry batchEntry=null;
public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer
container,ActiveMQDestination destination) throws IOException{
this.adapter = adapter;
@@ -47,7 +47,7 @@
}
protected MessageId getMessageId(Object object){
- return new MessageId(((ReferenceRecord)object).messageId);
+ return new MessageId(((ReferenceRecord)object).getMessageId());
}
public synchronized void addMessage(ConnectionContext context,Message
message) throws IOException{
@@ -60,13 +60,13 @@
protected void recover(MessageRecoveryListener listener,Object msg) throws
Exception{
ReferenceRecord record=(ReferenceRecord)msg;
- listener.recoverMessageReference(new MessageId(record.messageId));
+ listener.recoverMessageReference(new MessageId(record.getMessageId()));
}
public synchronized void recover(MessageRecoveryListener listener) throws
Exception{
for(StoreEntry
entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry);
- recover(listener,new MessageId(record.messageId));
+ recover(listener,new MessageId(record.getMessageId()));
}
listener.finished();
}
@@ -78,9 +78,6 @@
}else{
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
- if (entry==null) {
- batchEntry=null;
- }
}
if(entry!=null){
int count=0;
@@ -108,7 +105,7 @@
ReferenceRecord result=messageContainer.get(identity);
if(result==null)
return null;
- return result.data;
+ return result.getData();
}
public void addReferenceFileIdsInUse(){
@@ -123,10 +120,12 @@
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
- ReferenceRecord rr = messageContainer.remove(msgId);
- removeInterest(rr);
- if(messageContainer.isEmpty()){
- resetBatching();
+ ReferenceRecord rr=messageContainer.remove(msgId);
+ if(rr!=null){
+ removeInterest(rr);
+ if(messageContainer.isEmpty()){
+ resetBatching();
+ }
}
}
@@ -157,27 +156,23 @@
return true;
}
- /**
- * @param startAfter
- * @see
org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
- */
- public void setBatch(MessageId startAfter){
- resetBatching();
- if (startAfter != null) {
- batchEntry = messageContainer.getEntry(startAfter);
- }
-
- }
-
+
public boolean supportsExternalBatchControl(){
return true;
}
void removeInterest(ReferenceRecord rr) {
- adapter.removeInterestInRecordFile(rr.data.getFileId());
+ adapter.removeInterestInRecordFile(rr.getData().getFileId());
}
void addInterest(ReferenceRecord rr) {
- adapter.addInterestInRecordFile(rr.data.getFileId());
+ adapter.addInterestInRecordFile(rr.getData().getFileId());
+ }
+
+ /**
+ * @param startAfter
+ * @see
org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
+ */
+ public void setBatch(MessageId startAfter){
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Fri Mar 2 12:00:11 2007
@@ -53,7 +53,7 @@
}
protected MessageId getMessageId(Object object){
- return new MessageId(((ReferenceRecord)object).messageId);
+ return new MessageId(((ReferenceRecord)object).getMessageId());
}
public synchronized void addMessage(ConnectionContext context,Message
message) throws IOException{
@@ -66,7 +66,7 @@
protected void recover(MessageRecoveryListener listener,Object msg) throws
Exception{
ReferenceRecord record=(ReferenceRecord)msg;
- listener.recoverMessageReference(new MessageId(record.messageId));
+ listener.recoverMessageReference(new MessageId(record.getMessageId()));
}
public void addMessageReference(ConnectionContext context,MessageId
messageId,ReferenceData data)
@@ -94,7 +94,7 @@
ReferenceRecord result=messageContainer.get(identity);
if(result==null)
return null;
- return result.data;
+ return result.getData();
}
public void addReferenceFileIdsInUse(){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecord.java
Fri Mar 2 12:00:11 2007
@@ -18,8 +18,8 @@
public class ReferenceRecord{
- public String messageId;
- public ReferenceData data;
+ private String messageId;
+ private ReferenceData data;
public ReferenceRecord(){
}
@@ -27,5 +27,37 @@
public ReferenceRecord(String messageId,ReferenceData data){
this.messageId=messageId;
this.data=data;
+ }
+
+
+ /**
+ * @return the data
+ */
+ public ReferenceData getData(){
+ return this.data;
+ }
+
+
+ /**
+ * @param data the data to set
+ */
+ public void setData(ReferenceData data){
+ this.data=data;
+ }
+
+
+ /**
+ * @return the messageId
+ */
+ public String getMessageId(){
+ return this.messageId;
+ }
+
+
+ /**
+ * @param messageId the messageId to set
+ */
+ public void setMessageId(String messageId){
+ this.messageId=messageId;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java?view=diff&rev=513921&r1=513920&r2=513921
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ReferenceRecordMarshaller.java
Fri Mar 2 12:00:11 2007
@@ -24,11 +24,12 @@
public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
ReferenceRecord rr=new ReferenceRecord();
- rr.messageId=dataIn.readUTF();
- rr.data=new ReferenceData();
- rr.data.setFileId(dataIn.readInt());
- rr.data.setOffset(dataIn.readInt());
- rr.data.setExpiration(dataIn.readLong());
+ rr.setMessageId(dataIn.readUTF());
+ ReferenceData referenceData = new ReferenceData();
+ referenceData.setFileId(dataIn.readInt());
+ referenceData.setOffset(dataIn.readInt());
+ referenceData.setExpiration(dataIn.readLong());
+ rr.setData(referenceData);
return rr;
}
@@ -39,9 +40,9 @@
* @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object,
java.io.DataOutput)
*/
public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws
IOException{
- dataOut.writeUTF(rr.messageId);
- dataOut.writeInt(rr.data.getFileId());
- dataOut.writeInt(rr.data.getOffset());
- dataOut.writeLong(rr.data.getExpiration());
+ dataOut.writeUTF(rr.getMessageId());
+ dataOut.writeInt(rr.getData().getFileId());
+ dataOut.writeInt(rr.getData().getOffset());
+ dataOut.writeLong(rr.getData().getExpiration());
}
}