Author: tabish
Date: Tue Mar 26 22:36:22 2013
New Revision: 1461355
URL: http://svn.apache.org/r1461355
Log:
https://issues.apache.org/jira/browse/AMQCPP-367
Adds ConnectionAudit for use in dup detection and some more tests.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
(with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Mar 26
22:36:22 2013
@@ -106,6 +106,7 @@ cc_sources = \
activemq/core/ActiveMQXAConnectionFactory.cpp \
activemq/core/ActiveMQXASession.cpp \
activemq/core/AdvisoryConsumer.cpp \
+ activemq/core/ConnectionAudit.cpp \
activemq/core/DispatchData.cpp \
activemq/core/Dispatcher.cpp \
activemq/core/FifoMessageDispatchChannel.cpp \
@@ -743,6 +744,7 @@ h_sources = \
activemq/core/ActiveMQXAConnectionFactory.h \
activemq/core/ActiveMQXASession.h \
activemq/core/AdvisoryConsumer.h \
+ activemq/core/ConnectionAudit.h \
activemq/core/DispatchData.h \
activemq/core/Dispatcher.h \
activemq/core/FifoMessageDispatchChannel.h \
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Tue Mar 26 22:36:22 2013
@@ -23,6 +23,7 @@
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
#include <activemq/core/AdvisoryConsumer.h>
+#include <activemq/core/ConnectionAudit.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
@@ -195,6 +196,8 @@ namespace core{
TempDestinationMap activeTempDestinations;
+ ConnectionAudit connectionAudit;
+
ConnectionConfig(const Pointer<transport::Transport> transport,
const Pointer<decaf::util::Properties> properties) :
properties(properties),
@@ -454,6 +457,8 @@ ActiveMQConnection::ActiveMQConnection(c
configuration->connectionInfo->setManageable(true);
configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
+
configuration->connectionAudit.setCheckForDuplicates(transport->isFaultTolerant());
+
this->config = configuration.release();
}
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp?rev=1461355&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAudit.h"
+
+#include <decaf/util/LinkedHashMap.h>
+
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/commands/ActiveMQDestination.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl {
+ private:
+
+ ConnectionAuditImpl(const ConnectionAuditImpl&);
+ ConnectionAuditImpl& operator= (const ConnectionAuditImpl&);
+
+ public:
+
+ Mutex mutex;
+ LinkedHashMap<Pointer<ActiveMQDestination>,
Pointer<ActiveMQMessageAudit> > destinations;
+ LinkedHashMap<Pointer<Dispatcher>, Pointer<ActiveMQMessageAudit> >
dispatchers;
+
+ ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000)
{
+ }
+ };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit() : impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+
auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::ConnectionAudit(int auditDepth, int maxProducers) :
+ impl(new ConnectionAuditImpl),
+ checkForDuplicates(true),
+ auditDepth(auditDepth),
+ auditMaximumProducerNumber(maxProducers) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAudit::~ConnectionAudit() {
+ try {
+ delete this->impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+ synchronized(&this->impl->mutex) {
+ this->impl->dispatchers.remove(dispatcher);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message) {
+
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->destinations.get(destination);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth,
auditMaximumProducerNumber));
+ this->impl->destinations.put(destination, audit);
+ }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->dispatchers.get(dispatcher);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth,
auditMaximumProducerNumber));
+ this->impl->dispatchers.put(dispatcher, audit);
+ }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message) {
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ try {
+ Pointer<ActiveMQMessageAudit> audit =
this->impl->destinations.get(destination);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ } else {
+ try {
+ Pointer<ActiveMQMessageAudit> audit =
this->impl->dispatchers.get(dispatcher);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ }
+ }
+ }
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h?rev=1461355&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/core/Dispatcher.h>
+
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditImpl;
+
+ /**
+ * Provides the Auditing functionality used by Connections to attempt to
+ * filter out duplicate Messages.
+ *
+ * @since 3.7.0
+ */
+ class AMQCPP_API ConnectionAudit {
+ private:
+
+ ConnectionAudit(const ConnectionAudit&);
+ ConnectionAudit& operator= (const ConnectionAudit&);
+
+ private:
+
+ ConnectionAuditImpl* impl;
+
+ bool checkForDuplicates;
+ int auditDepth;
+ int auditMaximumProducerNumber;
+
+ public:
+
+ ConnectionAudit();
+
+ ConnectionAudit(int auditDepth, int maxProducers);
+
+ ~ConnectionAudit();
+
+ public:
+
+ void removeDispatcher(Pointer<Dispatcher> dispatcher);
+
+ bool isDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message);
+
+ void rollbackDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message);
+
+ public:
+
+ bool isCheckForDuplicates() const {
+ return this->checkForDuplicates;
+ }
+
+ void setCheckForDuplicates(bool checkForDuplicates) {
+ this->checkForDuplicates = checkForDuplicates;
+ }
+
+ int getAuditDepth() {
+ return auditDepth;
+ }
+
+ void setAuditDepth(int auditDepth) {
+ this->auditDepth = auditDepth;
+ }
+
+ int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+ this->auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDIT_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Tue Mar 26
22:36:22 2013
@@ -38,6 +38,7 @@ cc_sources = \
activemq/core/ActiveMQConnectionTest.cpp \
activemq/core/ActiveMQMessageAuditTest.cpp \
activemq/core/ActiveMQSessionTest.cpp \
+ activemq/core/ConnectionAuditTest.cpp \
activemq/core/FifoMessageDispatchChannelTest.cpp \
activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
activemq/exceptions/ActiveMQExceptionTest.cpp \
@@ -286,6 +287,7 @@ h_sources = \
activemq/core/ActiveMQConnectionTest.h \
activemq/core/ActiveMQMessageAuditTest.h \
activemq/core/ActiveMQSessionTest.h \
+ activemq/core/ConnectionAuditTest.h \
activemq/core/FifoMessageDispatchChannelTest.h \
activemq/core/SimplePriorityMessageDispatchChannelTest.h \
activemq/exceptions/ActiveMQExceptionTest.h \
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.cpp
Tue Mar 26 22:36:22 2013
@@ -88,6 +88,58 @@ void ActiveMQMessageAuditTest::testIsDup
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackString() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ IdGenerator idGen;
+
+ ArrayList<std::string> list;
+ for (int i = 0; i < count; i++) {
+ std::string id = idGen.generateId();
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ std::string id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE("duplicate, id:" + id, audit.isDuplicate(id));
+ audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id,
!audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageAuditTest::testRollbackMessageId() {
+
+ int count = 10000;
+ ActiveMQMessageAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(id));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(), audit.isDuplicate(id));
+ audit.rollback(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(), !audit.isDuplicate(id));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQMessageAuditTest::testIsInOrderString() {
int count = 10000;
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQMessageAuditTest.h
Tue Mar 26 22:36:22 2013
@@ -31,6 +31,8 @@ namespace core {
CPPUNIT_TEST( testIsDuplicateMessageId );
CPPUNIT_TEST( testIsInOrderString );
CPPUNIT_TEST( testIsInOrderMessageId );
+ CPPUNIT_TEST( testRollbackString );
+ CPPUNIT_TEST( testRollbackMessageId );
CPPUNIT_TEST( testGetLastSeqId );
CPPUNIT_TEST_SUITE_END();
@@ -43,6 +45,8 @@ namespace core {
void testIsDuplicateMessageId();
void testIsInOrderString();
void testIsInOrderMessageId();
+ void testRollbackString();
+ void testRollbackMessageId();
void testGetLastSeqId();
};
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1461355&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionAuditTest.h"
+
+#include <activemq/core/ConnectionAudit.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
+#include <activemq/util/IdGenerator.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQQueue.h>
+
+#include <decaf/util/ArrayList.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::util;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class MyDispatcher : public Dispatcher {
+ public:
+
+ virtual ~MyDispatcher() {}
+
+ virtual void dispatch(const Pointer<commands::MessageDispatch>&
message) {
+
+ }
+
+ virtual int getHashCode() const {
+ return 1;
+ }
+
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionAuditTest::~ConnectionAuditTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor1() {
+
+ ConnectionAudit audit;
+ CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+ CPPUNIT_ASSERT(audit.getAuditDepth() ==
ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE);
+ CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() ==
ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testConstructor2() {
+
+ ConnectionAudit audit(100, 200);
+ CPPUNIT_ASSERT(audit.isCheckForDuplicates());
+ CPPUNIT_ASSERT(audit.getAuditDepth() == 100);
+ CPPUNIT_ASSERT(audit.getAuditMaximumProducerNumber() == 200);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testIsDuplicate() {
+
+ int count = 10000;
+ ConnectionAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+ Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+ Pointer<Message> message(new Message());
+ message->setDestination(destination);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+
+ message->setMessageId(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ message->setMessageId(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(),
+ audit.isDuplicate(dispatcher, message));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionAuditTest::testRollbackDuplicate() {
+
+ int count = 10000;
+ ConnectionAudit audit;
+ ArrayList<Pointer<MessageId> > list;
+ Pointer<MyDispatcher> dispatcher(new MyDispatcher);
+
+ Pointer<ProducerId> pid(new ProducerId);
+ pid->setConnectionId("test");
+ pid->setSessionId(0);
+ pid->setValue(1);
+
+ Pointer<ActiveMQDestination> destination(new ActiveMQQueue("TEST.QUEUE"));
+ Pointer<Message> message(new Message());
+ message->setDestination(destination);
+
+ for (int i = 0; i < count; i++) {
+ Pointer<MessageId> id(new MessageId);
+ id->setProducerId(pid);
+ id->setProducerSequenceId(i);
+ list.add(id);
+
+ message->setMessageId(id);
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ }
+
+ int index = list.size() -1 -audit.getAuditDepth();
+ for (; index < list.size(); index++) {
+ Pointer<MessageId> id = list.get(index);
+ message->setMessageId(id);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(),
+ audit.isDuplicate(dispatcher, message));
+ audit.rollbackDuplicate(dispatcher, message);
+ CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" +
id->toString(),
+ !audit.isDuplicate(dispatcher, message));
+ }
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h?rev=1461355&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
Tue Mar 26 22:36:22 2013
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+#define _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace activemq {
+namespace core {
+
+ class ConnectionAuditTest : public CppUnit::TestFixture {
+
+ CPPUNIT_TEST_SUITE( ConnectionAuditTest );
+ CPPUNIT_TEST( testConstructor1 );
+ CPPUNIT_TEST( testConstructor2 );
+ CPPUNIT_TEST( testIsDuplicate );
+ CPPUNIT_TEST( testRollbackDuplicate );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ ConnectionAuditTest();
+ virtual ~ConnectionAuditTest();
+
+ void testConstructor1();
+ void testConstructor2();
+ void testIsDuplicate();
+ void testRollbackDuplicate();
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_CONNECTIONAUDITTEST_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp?rev=1461355&r1=1461354&r2=1461355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp Tue Mar
26 22:36:22 2013
@@ -90,6 +90,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::core::SimplePriorityMessageDispatchChannelTest );
#include <activemq/core/ActiveMQMessageAuditTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQMessageAuditTest );
+#include <activemq/core/ConnectionAuditTest.h>
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ConnectionAuditTest );
#include <activemq/state/ConnectionStateTrackerTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::state::ConnectionStateTrackerTest );