Author: tabish
Date: Fri Jan 16 06:07:12 2009
New Revision: 735020
URL: http://svn.apache.org/viewvc?rev=735020&view=rev
Log:
A new Support class for ActiveMQConnection, manages the details of extracting
Connection Properties from the URI properties and maintaining the Transport
instance for the Connection.
Added:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
(with props)
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
(with props)
Added:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp?rev=735020&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
(added)
+++
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
Fri Jan 16 06:07:12 2009
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConnectionSupport.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConstants.h>
+
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionSupport::ActiveMQConnectionSupport( transport::Transport*
transport,
+ const
decaf::util::Properties& properties ) {
+
+ if( transport == NULL ) {
+ throw decaf::lang::exceptions::IllegalArgumentException(
+ __FILE__, __LINE__,
+ "ActiveMQConnectionSupport::ActiveMQConnectionSupport - "
+ "Required Parameter 'transport' was NULL.");
+ }
+
+ this->properties = properties;
+ this->transport = transport;
+
+ // Start the Transport
+ this->transport->start();
+
+ // Check the connection options
+ this->setAlwaysSyncSend( Boolean::parseBoolean(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND ), "false"
) ) );
+
+ this->setUseAsyncSend( Boolean::parseBoolean(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" )
) );
+
+ this->setProducerWindowSize( decaf::lang::Integer::parseInt(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0"
) ) );
+
+ this->setSendTimeout( decaf::lang::Integer::parseInt(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) ) );
+
+ this->setCloseTimeout( decaf::lang::Integer::parseInt(
+ properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" )
) );
+
+ this->setClientId( properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_CLIENTID ), "" ) );
+
+ this->setUsername( properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_USERNAME ), "" ) );
+
+ this->setPassword( properties.getProperty(
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_PASSWORD ), "" ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionSupport::~ActiveMQConnectionSupport() {
+ try{
+ this->close();
+ }
+ AMQ_CATCH_NOTHROW( ActiveMQException )
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionSupport::close() throw( decaf::lang::Exception ) {
+
+ bool hasException = false;
+ exceptions::ActiveMQException e;
+
+ try {
+
+ if( transport != NULL ){
+
+ try{
+ transport->close();
+ }catch( exceptions::ActiveMQException& ex ){
+ if( !hasException ){
+ hasException = true;
+ ex.setMark(__FILE__, __LINE__ );
+ e = ex;
+ }
+ }
+
+ try{
+ delete transport;
+ }catch( exceptions::ActiveMQException& ex ){
+ if( !hasException ){
+ hasException = true;
+ ex.setMark(__FILE__, __LINE__ );
+ e = ex;
+ }
+ }
+
+ transport = NULL;
+ }
+
+ // If we encountered an exception - throw the first one we encountered.
+ // This will preserve the stack trace for logging purposes.
+ if( hasException ){
+ throw e;
+ }
+ }
+ AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_NOTHROW( )
+}
Propchange:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=735020&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
(added)
+++
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
Fri Jan 16 06:07:12 2009
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/io/Closeable.h>
+#include <decaf/util/Properties.h>
+#include <decaf/lang/Exception.h>
+
+#include <activemq/transport/Transport.h>
+#include <activemq/util/LongSequenceGenerator.h>
+
+#include <memory>
+
+namespace activemq {
+namespace core {
+
+ class AMQCPP_API ActiveMQConnectionSupport : public decaf::io::Closeable {
+ private:
+
+ // Properties used to configure this connection.
+ decaf::util::Properties properties;
+
+ // Transport we are using
+ transport::Transport* transport;
+
+ /**
+ * Boolean indicating that we are to always send message Synchronously.
+ * This overrides the sending on non-persistent messages or transacted
+ * messages asynchronously, also fully overrides the useAsyncSend flag.
+ */
+ bool alwaysSyncSend;
+
+ /**
+ * Boolean indicating that we are to send any messages that we would
normally
+ * send synchronously using an asynchronous send. Normally we send
all the
+ * persistent messages not in a transaction synchronously and all
others are
+ * sent asynchronously. Only applied though is alwaysSyncSend is
false.
+ */
+ bool useAsyncSend;
+
+ /**
+ * Send Timeout, forces all messages to be sent Synchronously.
+ */
+ unsigned int sendTimeout;
+
+ /**
+ * Close Timeout, time to wait for a Closed message from the broker
before
+ * giving up and just shutting down the connection.
+ */
+ unsigned int closeTimeout;
+
+ /**
+ * Producer Window Size, amount of memory that can be used before the
producer
+ * blocks and waits for ProducerAck messages.
+ */
+ unsigned int producerWindowSize;
+
+ /**
+ * The configured User Name
+ */
+ std::string username;
+
+ /**
+ * The configured Password
+ */
+ std::string password;
+
+ /**
+ * The configured Client Id
+ */
+ std::string clientId;
+
+ /**
+ * Next available Producer Id
+ */
+ util::LongSequenceGenerator producerIds;
+
+ /**
+ * Next available Producer Sequence Id
+ */
+ util::LongSequenceGenerator producerSequenceIds;
+
+ /**
+ * Next available Consumer Id
+ */
+ util::LongSequenceGenerator consumerIds;
+
+ /**
+ * Next available Transaction Id
+ */
+ util::LongSequenceGenerator transactionIds;
+
+ /**
+ * Next available Session Id.
+ */
+ util::LongSequenceGenerator sessionIds;
+
+ /**
+ * Next Temporary Destination Id
+ */
+ util::LongSequenceGenerator tempDestinationIds;
+
+ public:
+
+ /**
+ * Creates an instance of the ActiveMQConnectionSupport class, the
+ * most common properties for a connection are pulled from the
+ * properties instance or are set to defaults.
+ *
+ * @param properties
+ * The URI configured properties for this connection.
+ */
+ ActiveMQConnectionSupport( transport::Transport* transport,
+ const decaf::util::Properties& properties );
+
+ virtual ~ActiveMQConnectionSupport();
+
+ /**
+ * Gets the Properties object that this Config object was initialized
with.
+ * @returns a const reference to the Connection Config.
+ */
+ const decaf::util::Properties& getProperties() const {
+ return this->properties;
+ }
+
+ /**
+ * Gets the Transport Configured for this Connection.
+ * @return the configured transport
+ */
+ transport::Transport& getTransport() const {
+ return *( this->transport );
+ }
+
+ /**
+ * Gets if the Connection should always send things Synchronously.
+ * @return true if sends should always be Synchronous.
+ */
+ bool isAlwaysSyncSend() const {
+ return this->alwaysSyncSend;
+ }
+
+ /**
+ * Sets if the Connection should always send things Synchronously.
+ * @param value
+ * true if sends should always be Synchronous.
+ */
+ void setAlwaysSyncSend( bool value ) {
+ this->alwaysSyncSend = value;
+ }
+
+ /**
+ * Gets if the useAsyncSend option is set
+ * @returns true if on false if not.
+ */
+ bool isUseAsyncSend() const {
+ return this->useAsyncSend;
+ }
+
+ /**
+ * Sets the useAsyncSend option
+ * @param value - true to activate, false to disable.
+ */
+ void setUseAsyncSend( bool value ) {
+ this->useAsyncSend = value;
+ }
+
+ /**
+ * Gets the assigned send timeout for this Connector
+ * @return the send timeout configured in the connection uri
+ */
+ unsigned int getSendTimeout() const {
+ return this->sendTimeout;
+ }
+
+ /**
+ * Sets the send timeout to use when sending Message objects, this will
+ * cause all messages to be sent using a Synchronous request is
non-zero.
+ * @param timeout - The time to wait for a response.
+ */
+ void setSendTimeout( unsigned int timeout ) {
+ this->sendTimeout = timeout;
+ }
+
+ /**
+ * Gets the assigned close timeout for this Connector
+ * @return the close timeout configured in the connection uri
+ */
+ unsigned int getCloseTimeout() const {
+ return this->closeTimeout;
+ }
+
+ /**
+ * Sets the close timeout to use when sending the disconnect request.
+ * @param timeout - The time to wait for a close message.
+ */
+ void setCloseTimeout( unsigned int timeout ) {
+ this->closeTimeout = timeout;
+ }
+
+ /**
+ * Gets the configured producer window size for Producers that are
created
+ * from this connector. This only applies if there is no send timeout
and the
+ * producer is able to send asynchronously.
+ * @return size in bytes of messages that this producer can produce
before
+ * it must block and wait for ProducerAck messages to free
resources.
+ */
+ unsigned int getProducerWindowSize() const {
+ return this->producerWindowSize;
+ }
+
+ /**
+ * Sets the size in Bytes of messages that a producer can send before
it is blocked
+ * to await a ProducerAck from the broker that frees enough memory to
allow another
+ * message to be sent.
+ * @param windowSize - The size in bytes of the Producers memory
window.
+ */
+ void setProducerWindowSize( unsigned int windowSize ) {
+ this->producerWindowSize = windowSize;
+ }
+
+ /**
+ * Gets the Configured Username.
+ * @return the username.
+ */
+ std::string getUsername() const {
+ return this->username;
+ }
+
+ /**
+ * Sets the Username.
+ * @param username - The new username value.
+ */
+ void setUsername( const std::string& username ) {
+ this->username = username;
+ }
+
+ /**
+ * Gets the Configured Password.
+ * @return the password.
+ */
+ std::string getPassword() const {
+ return this->password;
+ }
+
+ /**
+ * Sets the Password.
+ * @param password - The new password value.
+ */
+ void setPassword( const std::string& password ) {
+ this->password = password;
+ }
+
+ /**
+ * Gets the Configured Client Id.
+ * @return the clientId.
+ */
+ std::string getClientId() const {
+ return this->clientId;
+ }
+
+ /**
+ * Sets the Client Id.
+ * @param clientId - The new clientId value.
+ */
+ void setClientId( const std::string& clientId ) {
+ this->clientId = clientId;
+ }
+
+ /**
+ * Get the Next available Producer Id
+ * @return the next id in the sequence.
+ */
+ long long getNextProducerId() {
+ return this->producerIds.getNextSequenceId();
+ }
+
+ /**
+ * Get the Next available Producer Sequence Id
+ * @return the next id in the sequence.
+ */
+ long long getNextProducerSequenceId() {
+ return this->producerSequenceIds.getNextSequenceId();
+ }
+
+ /**
+ * Get the Next available Consumer Id
+ * @return the next id in the sequence.
+ */
+ long long getNextConsumerId() {
+ return this->consumerIds.getNextSequenceId();
+ }
+
+ /**
+ * Get the Next available Transaction Id
+ * @return the next id in the sequence.
+ */
+ long long getNextTransactionId() {
+ return this->transactionIds.getNextSequenceId();
+ }
+
+ /**
+ * Get the Next available Session Id.
+ * @return the next id in the sequence.
+ */
+ long long getNextSessionId() {
+ return this->sessionIds.getNextSequenceId();
+ }
+
+ /**
+ * Get the Next Temporary Destination Id
+ * @return the next id in the sequence.
+ */
+ long long getNextTempDestinationId() {
+ return this->tempDestinationIds.getNextSequenceId();
+ }
+
+ public: // decaf::io::Closeable
+
+ /**
+ * Closes this object and deallocates the appropriate resources.
+ * The object is generally no longer usable after calling close.
+ * @throws CMSException
+ */
+ virtual void close() throw( decaf::lang::Exception );
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_*/
Propchange:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
------------------------------------------------------------------------------
svn:eol-style = native