Author: tabish
Date: Wed Apr 3 22:39:07 2013
New Revision: 1464225
URL: http://svn.apache.org/r1464225
Log:
Fix potential segfault when using async sends with callback.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1464225&r1=1464224&r2=1464225&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
Wed Apr 3 22:39:07 2013
@@ -21,7 +21,7 @@
#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
-#include <map>
+#include <decaf/util/HashMap.h>
#include <activemq/commands/Response.h>
#include <activemq/commands/ExceptionResponse.h>
@@ -52,17 +52,19 @@ namespace {
Mutex* mutex;
int commandId;
- std::map<unsigned int, Pointer<FutureResponse> >* map;
+ HashMap<unsigned int, Pointer<FutureResponse> >* map;
public:
- ResponseFinalizer(Mutex* mutex, int commandId, std::map<unsigned int,
Pointer<FutureResponse> >* map) :
+ ResponseFinalizer(Mutex* mutex, int commandId, HashMap<unsigned int,
Pointer<FutureResponse> >* map) :
mutex(mutex), commandId(commandId), map(map) {
}
~ResponseFinalizer() {
synchronized(mutex){
- map->erase(commandId);
+ try {
+ map->remove(commandId);
+ } catch (...) {}
}
}
};
@@ -80,7 +82,7 @@ namespace correlator{
decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
// Map of request ids to future response objects.
- std::map<unsigned int, Pointer<FutureResponse> > requestMap;
+ HashMap<unsigned int, Pointer<FutureResponse> > requestMap;
// Sync object for accessing the request map.
decaf::util::concurrent::Mutex mapMutex;
@@ -148,8 +150,7 @@ Pointer<FutureResponse> ResponseCorrelat
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
- this->impl->requestMap.insert(
- make_pair((unsigned int) command->getCommandId(),
futureResponse));
+ this->impl->requestMap.put((unsigned int)
command->getCommandId(), futureResponse);
}
}
@@ -169,7 +170,7 @@ Pointer<FutureResponse> ResponseCorrelat
next->oneway(command);
} catch (Exception &ex) {
// We have to ensure this gets cleaned out otherwise we can
consume memory over time.
- this->impl->requestMap.erase(command->getCommandId());
+ this->impl->requestMap.remove(command->getCommandId());
throw;
}
@@ -199,8 +200,7 @@ Pointer<Response> ResponseCorrelator::re
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
- this->impl->requestMap.insert(
- make_pair((unsigned int) command->getCommandId(),
futureResponse));
+ this->impl->requestMap.put((unsigned int)
command->getCommandId(), futureResponse);
}
}
@@ -251,8 +251,7 @@ Pointer<Response> ResponseCorrelator::re
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
- this->impl->requestMap.insert(
- make_pair((unsigned int) command->getCommandId(),
futureResponse));
+ this->impl->requestMap.put((unsigned int)
command->getCommandId(), futureResponse);
}
}
@@ -301,16 +300,13 @@ void ResponseCorrelator::onCommand(const
// It is a response - let's correlate ...
synchronized(&this->impl->mapMutex) {
- // Look the future request up based on the correlation id.
- std::map<unsigned int, Pointer<FutureResponse> >::iterator iter =
- this->impl->requestMap.find(response->getCorrelationId());
- if (iter == this->impl->requestMap.end()) {
+ Pointer<FutureResponse> futureResponse;
+ try {
+ futureResponse =
this->impl->requestMap.remove(response->getCorrelationId());
+ } catch (NoSuchElementException& ex) {
return;
}
- // Get the future response (if it's in the map, it's not NULL).
- Pointer<FutureResponse> futureResponse = iter->second;
-
// Set the response property in the future response.
futureResponse->setResponse(response);
}
@@ -340,10 +336,7 @@ void ResponseCorrelator::dispose(Pointer
if (this->impl->priorError == NULL) {
this->impl->priorError = error;
requests.ensureCapacity((int)this->impl->requestMap.size());
- std::map<unsigned int, Pointer<FutureResponse> >::iterator iter =
this->impl->requestMap.begin();
- for (; iter != this->impl->requestMap.end(); ++iter) {
- requests.add(iter->second);
- }
+ requests.copy(this->impl->requestMap.values());
this->impl->requestMap.clear();
}
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1464225&r1=1464224&r2=1464225&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Wed Apr 3 22:39:07 2013
@@ -55,7 +55,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireAsyncSenderTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireClientAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
-//CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
+CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest
);
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireExpirationTest );