Author: tabish
Date: Mon Mar 25 21:18:46 2013
New Revision: 1460900
URL: http://svn.apache.org/r1460900
Log:
https://issues.apache.org/jira/browse/AMQCPP-367
Adds a message Audit to use for dup detection.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
(with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Mar 25
21:18:46 2013
@@ -96,6 +96,7 @@ cc_sources = \
activemq/core/ActiveMQConnectionMetaData.cpp \
activemq/core/ActiveMQConstants.cpp \
activemq/core/ActiveMQConsumer.cpp \
+ activemq/core/ActiveMQMessageAudit.cpp \
activemq/core/ActiveMQProducer.cpp \
activemq/core/ActiveMQQueueBrowser.cpp \
activemq/core/ActiveMQSession.cpp \
@@ -732,6 +733,7 @@ h_sources = \
activemq/core/ActiveMQConnectionMetaData.h \
activemq/core/ActiveMQConstants.h \
activemq/core/ActiveMQConsumer.h \
+ activemq/core/ActiveMQMessageAudit.h \
activemq/core/ActiveMQProducer.h \
activemq/core/ActiveMQQueueBrowser.h \
activemq/core/ActiveMQSession.h \
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp?rev=1460900&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQMessageAudit.h"
+
+#include <activemq/util/IdGenerator.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/ProducerId.h>
+
+#include <decaf/util/LRUCache.h>
+#include <decaf/util/BitSet.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+#include <string>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+ class MessageAuditImpl {
+ private:
+
+ MessageAuditImpl(const MessageAuditImpl&);
+ MessageAuditImpl& operator= (const MessageAuditImpl&);
+
+ public:
+
+ int auditDepth;
+ int maximumNumberOfProducersToTrack;
+ Mutex mutex;
+
+ LRUCache<std::string, Pointer<BitSet> > map;
+
+ MessageAuditImpl() : auditDepth(2048),
+ maximumNumberOfProducersToTrack(64),
+ mutex(),
+ map() {
+ }
+
+ MessageAuditImpl(int auditDepth, int maximumNumberOfProducersToTrack) :
+ auditDepth(auditDepth),
+ maximumNumberOfProducersToTrack(maximumNumberOfProducersToTrack),
+ mutex(),
+ map() {
+ }
+
+ void adjustMaxProducersToTrack(int value) {
+ // When value is smaller than current we need to move the entries
+ // to a new cache with that setting so that old ones are pruned
+ // since putAll will access the entries in the right order,
+ // this shouldn't result in wrong cache entries being removed
+ if (value < maximumNumberOfProducersToTrack) {
+ LRUCache<std::string, Pointer<BitSet> > newMap(0, value,
0.75f, true);
+ newMap.putAll(this->map);
+ this->map.clear();
+ this->map.putAll(newMap);
+ }
+ this->map.setMaxCacheSize(value);
+ this->maximumNumberOfProducersToTrack = value;
+ }
+ };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::ActiveMQMessageAudit() : impl(new MessageAuditImpl) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::ActiveMQMessageAudit(int auditDepth, int
maximumNumberOfProducersToTrack) :
+ impl(new MessageAuditImpl(auditDepth, maximumNumberOfProducersToTrack)) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAudit::~ActiveMQMessageAudit() {
+ try {
+ delete this->impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQMessageAudit::getAuditDepth() const {
+ return this->impl->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::setAuditDepth(int value) {
+ this->impl->auditDepth = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQMessageAudit::getMaximumNumberOfProducersToTrack() const {
+ return this->impl->maximumNumberOfProducersToTrack;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::getMaximumNumberOfProducersToTrack(int value) {
+ this->impl->adjustMaxProducersToTrack(value);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isDuplicate(const std::string& id) const {
+ bool answer = false;
+ std::string seed = IdGenerator::getSeedFromId(id);
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ bits.reset(new BitSet(this->impl->auditDepth));
+ this->impl->map.put(seed, bits);
+ }
+
+ long long index = IdGenerator::getSequenceFromId(id);
+ if (index >= 0) {
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index - Integer::MAX_VALUE);
+ }
+
+ answer = bits->get(scaledIndex);
+ if (!answer) {
+ bits->set(scaledIndex, true);
+ }
+ }
+ }
+ }
+ return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isDuplicate(decaf::lang::Pointer<MessageId> msgId)
const {
+ bool answer = false;
+
+ if (msgId != NULL) {
+ Pointer<ProducerId> pid = msgId->getProducerId();
+ if (pid != NULL) {
+ std::string seed = pid->toString();
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ bits.reset(new BitSet(this->impl->auditDepth));
+ this->impl->map.put(seed, bits);
+ }
+
+ long long index = msgId->getProducerSequenceId();
+ if (index >= 0) {
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index - Integer::MAX_VALUE);
+ }
+
+ answer = bits->get(scaledIndex);
+ if (!answer) {
+ bits->set(scaledIndex, true);
+ }
+ }
+ }
+ }
+ }
+ }
+ return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::rollback(const std::string& msgId) {
+ std::string seed = IdGenerator::getSeedFromId(msgId);
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ }
+
+ if (bits != NULL) {
+ long long index = IdGenerator::getSequenceFromId(msgId);
+ if (index >= 0) {
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index - Integer::MAX_VALUE);
+ }
+
+ bits->set(scaledIndex, false);
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::rollback(decaf::lang::Pointer<commands::MessageId>
msgId) {
+ if (msgId != NULL) {
+ Pointer<ProducerId> pid = msgId->getProducerId();
+ if (pid != NULL) {
+ std::string seed = pid->toString();
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ }
+
+ if (bits != NULL) {
+ long long index = msgId->getProducerSequenceId();
+ if (index >= 0) {
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index -
Integer::MAX_VALUE);
+ }
+
+ bits->set(scaledIndex, false);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isInOrder(const std::string& msgId) const {
+ bool answer = true;
+
+ if (!msgId.empty()) {
+ std::string seed = IdGenerator::getSeedFromId(msgId);
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ bits.reset(new BitSet(this->impl->auditDepth));
+ this->impl->map.put(seed, bits);
+ }
+
+ long long index = IdGenerator::getSequenceFromId(msgId);
+ if (index >= 0) {
+
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index - Integer::MAX_VALUE);
+ }
+
+ answer = ((bits->length() - 1) == scaledIndex);
+ }
+ }
+ }
+ }
+ return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageAudit::isInOrder(decaf::lang::Pointer<commands::MessageId>
msgId) const {
+ bool answer = false;
+
+ if (msgId != NULL) {
+ Pointer<ProducerId> pid = msgId->getProducerId();
+ if (pid != NULL) {
+ std::string seed = pid->toString();
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ bits.reset(new BitSet(this->impl->auditDepth));
+ this->impl->map.put(seed, bits);
+ }
+
+ long long index = msgId->getProducerSequenceId();
+ if (index >= 0) {
+ int scaledIndex = (int) index;
+ if (index > Integer::MAX_VALUE) {
+ scaledIndex = (int)(index - Integer::MAX_VALUE);
+ }
+ answer = ((bits->length() - 1) == scaledIndex);
+ }
+ }
+ }
+ }
+ }
+ return answer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long
ActiveMQMessageAudit::getLastSeqId(decaf::lang::Pointer<commands::ProducerId>
id) const {
+ long result = -1;
+ if (id != NULL) {
+ std::string seed = id->toString();
+ if (!seed.empty()) {
+
+ synchronized(&this->impl->mutex) {
+
+ Pointer<BitSet> bits;
+ try {
+ bits = this->impl->map.get(seed);
+ } catch (NoSuchElementException& ex) {
+ }
+
+ if (bits != NULL) {
+ result = bits->length() - 1;
+ }
+ }
+ }
+ }
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAudit::clear() {
+ this->impl->map.clear();
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h?rev=1460900&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/MessageId.h>
+#include <activemq/commands/ProducerId.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+ class MessageAuditImpl;
+
+ class AMQCPP_API ActiveMQMessageAudit {
+ private:
+
+ MessageAuditImpl* impl;
+
+ public:
+
+ /**
+ * Default Constructor windowSize = 2048,
maximumNumberOfProducersToTrack = 64
+ */
+ ActiveMQMessageAudit();
+
+ /**
+ * Construct a MessageAudit
+ *
+ * @param auditDepth
+ * The range of ids to track.
+ * @param maximumNumberOfProducersToTrack
+ * The number of producers expected in the system
+ */
+ ActiveMQMessageAudit(int auditDepth, int
maximumNumberOfProducersToTrack);
+
+ ~ActiveMQMessageAudit();
+
+ public:
+
+ /**
+ * Gets the currently configured Audit Depth
+ *
+ * @returns the current audit depth setting
+ */
+ int getAuditDepth() const;
+
+ /**
+ * Sets a new Audit Depth value.
+ *
+ * @param value
+ * The range of ids to track.
+ */
+ void setAuditDepth(int value);
+
+ /**
+ * @returns the current number of producers that will be tracked.
+ */
+ int getMaximumNumberOfProducersToTrack() const;
+
+ /**
+ * Sets the number of producers to track
+ *
+ * @param value
+ * The number of producers expected in the system
+ */
+ void getMaximumNumberOfProducersToTrack(int value);
+
+ /**
+ * checks whether this messageId has been seen before and adds this
+ * messageId to the list
+ *
+ * @param msgId
+ * The string value Message Id.
+ *
+ * @return true if the message is a duplicate.
+ */
+ bool isDuplicate(const std::string& msgId) const;
+
+ /**
+ * Checks if this messageId has been seen before
+ *
+ * @param msgId
+ * The target MessageId to check.
+ *
+ * @return true if the message is a duplicate
+ */
+ bool isDuplicate(decaf::lang::Pointer<commands::MessageId> msgId)
const;
+
+ /**
+ * Marks this message as being received.
+ *
+ * @param msgId
+ * The string value Message Id.
+ */
+ void rollback(const std::string& msgId);
+
+ /**
+ * Marks this message as being received.
+ *
+ * @param msgId
+ * The target MessageId to check.
+ */
+ void rollback(decaf::lang::Pointer<commands::MessageId> msgId);
+
+ /**
+ * Check the MessageId is in order
+ *
+ * @param msgId
+ * The string value Message Id.
+ *
+ * @return true if the MessageId is in order.
+ */
+ bool isInOrder(const std::string& msgId) const;
+
+ /**
+ * Check the MessageId is in order
+ *
+ * @param msgId
+ * The target MessageId to check.
+ *
+ * @return true if the MessageId is in order.
+ */
+ bool isInOrder(decaf::lang::Pointer<commands::MessageId> msgId) const;
+
+ /**
+ * @returns the last sequence Id that we've audited for the given
producer.
+ */
+ long long getLastSeqId(decaf::lang::Pointer<commands::ProducerId> id)
const;
+
+ /**
+ * Clears this Audit.
+ */
+ void clear();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDIT_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageAudit.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Mon Mar 25
21:18:46 2013
@@ -36,6 +36,7 @@ cc_sources = \
activemq/commands/XATransactionIdTest.cpp \
activemq/core/ActiveMQConnectionFactoryTest.cpp \
activemq/core/ActiveMQConnectionTest.cpp \
+ activemq/core/ActiveMQMessageAuditTest.cpp \
activemq/core/ActiveMQSessionTest.cpp \
activemq/core/FifoMessageDispatchChannelTest.cpp \
activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
@@ -283,6 +284,7 @@ h_sources = \
activemq/commands/XATransactionIdTest.h \
activemq/core/ActiveMQConnectionFactoryTest.h \
activemq/core/ActiveMQConnectionTest.h \
+ activemq/core/ActiveMQMessageAuditTest.h \
activemq/core/ActiveMQSessionTest.h \
activemq/core/FifoMessageDispatchChannelTest.h \
activemq/core/SimplePriorityMessageDispatchChannelTest.h \
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp?rev=1460900&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQMessageAuditTest.h"
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/util/IdGenerator.h>
+
+#include <decaf/util/ArrayList.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAuditTest::ActiveMQMessageAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageAuditTest::~ActiveMQMessageAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsDuplicateString() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ IdGenerator idGen;
+
+ ArrayList<std::string> list;
+ for (int i = 0; i < count; i++) {
+ std::string id = idGen.generateId();
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ std::string id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsDuplicateMessageId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(), audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsInOrderString() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ IdGenerator idGen;
+
+ ArrayList<std::string> list;
+ for (int i = 0; i < count; i++) {
+ std::string id = idGen.generateId();
+ if (i == 0) {
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ CPPUNIT_ASSERT(audit.isInOrder(id));
+ }
+ if (i > 1 && i % 2 != 0) {
+ list.add(id);
+ }
+ }
+
+ for (int i = 0; i < list.size(); i++) {
+ std::string id = list.get(i);
+ CPPUNIT_ASSERT_MESSAGE(std::string("Out of order msg: ") + id,
!audit.isInOrder(id));
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testIsInOrderMessageId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+
+ if (i == 0) {
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ CPPUNIT_ASSERT(audit.isInOrder(id));
+ }
+ if (i > 1 && i % 2 != 0) {
+ list.add(id);
+ }
+ }
+
+ for (int i = 0; i < list.size(); i++) {
+ Pointer<MessageId> mid = list.get(i);
+ CPPUNIT_ASSERT_MESSAGE(std::string("Out of order msg: ") +
mid->toString(), !audit.isInOrder(mid));
+ CPPUNIT_ASSERT(!audit.isDuplicate(mid));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testGetLastSeqId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+
+ for (int i = 0; i < count; i++) {
+ id->setProducerSequenceId(i);
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ CPPUNIT_ASSERT_EQUAL((long long)i, audit.getLastSeqId(pid));
+ }
+
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h?rev=1460900&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
Mon Mar 25 21:18:46 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+ class ActiveMQMessageAuditTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( ActiveMQMessageAuditTest );
+ CPPUNIT_TEST( testIsDuplicateString );
+ CPPUNIT_TEST( testIsDuplicateMessageId );
+ CPPUNIT_TEST( testIsInOrderString );
+ CPPUNIT_TEST( testIsInOrderMessageId );
+ CPPUNIT_TEST( testGetLastSeqId );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ ActiveMQMessageAuditTest();
+ virtual ~ActiveMQMessageAuditTest();
+
+ void testIsDuplicateString();
+ void testIsDuplicateMessageId();
+ void testIsInOrderString();
+ void testIsInOrderMessageId();
+ void testGetLastSeqId();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQMESSAGEAUDITTEST_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1460900&r1=1460899&r2=1460900&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Mon Mar
25 21:18:46 2013
@@ -88,6 +88,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::core::FifoMessageDispatchChannelTest );
#include <activemq/core/SimplePriorityMessageDispatchChannelTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::core::SimplePriorityMessageDispatchChannelTest );
+#include <activemq/core/ActiveMQMessageAuditTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQMessageAuditTest );
#include <activemq/state/ConnectionStateTrackerTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );