http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.cpp b/hedwig-client/src/main/cpp/lib/clientimpl.cpp deleted file mode 100644 index 40114d6..0000000 --- a/hedwig-client/src/main/cpp/lib/clientimpl.cpp +++ /dev/null @@ -1,738 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "clientimpl.h" -#include "channel.h" -#include "publisherimpl.h" -#include "subscriberimpl.h" -#include "simplesubscriberimpl.h" -#include "multiplexsubscriberimpl.h" -#include <log4cxx/logger.h> - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -using namespace Hedwig; - -const int DEFAULT_MESSAGE_FORCE_CONSUME_RETRY_WAIT_TIME = 5000; -const std::string DEFAULT_SERVER_DEFAULT_VAL = ""; -const bool DEFAULT_SSL_ENABLED = false; - -void SyncOperationCallback::wait() { - boost::unique_lock<boost::mutex> lock(mut); - while(response==PENDING) { - if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) { - LOG4CXX_ERROR(logger, "Timeout waiting for operation to complete " << this); - - response = TIMEOUT; - } - } -} - -void SyncOperationCallback::operationComplete() { - if (response == TIMEOUT) { - LOG4CXX_ERROR(logger, "operationCompleted successfully after timeout " << this); - return; - } - - { - boost::lock_guard<boost::mutex> lock(mut); - response = SUCCESS; - } - cond.notify_all(); -} - -void SyncOperationCallback::operationFailed(const std::exception& exception) { - if (response == TIMEOUT) { - LOG4CXX_ERROR(logger, "operationCompleted unsuccessfully after timeout " << this); - return; - } - - { - boost::lock_guard<boost::mutex> lock(mut); - - if (typeid(exception) == typeid(ChannelConnectException)) { - response = NOCONNECT; - } else if (typeid(exception) == typeid(ServiceDownException)) { - response = SERVICEDOWN; - } else if (typeid(exception) == typeid(AlreadySubscribedException)) { - response = ALREADY_SUBSCRIBED; - } else if (typeid(exception) == typeid(NotSubscribedException)) { - response = NOT_SUBSCRIBED; - } else { - response = UNKNOWN; - } - } - cond.notify_all(); -} - -void SyncOperationCallback::throwExceptionIfNeeded() { - switch (response) { - case SUCCESS: - break; - case NOCONNECT: - throw CannotConnectException(); - break; - case SERVICEDOWN: - throw ServiceDownException(); - break; - case ALREADY_SUBSCRIBED: - throw AlreadySubscribedException(); - break; - case NOT_SUBSCRIBED: - throw NotSubscribedException(); - break; - case TIMEOUT: - throw ClientTimeoutException(); - break; - default: - throw ClientException(); - break; - } -} - -ResponseHandler::ResponseHandler(const DuplexChannelManagerPtr& channelManager) - : channelManager(channelManager) { -} - -void ResponseHandler::redirectRequest(const PubSubResponsePtr& response, - const PubSubDataPtr& data, - const DuplexChannelPtr& channel) { - HostAddress oldhost = channel->getHostAddress(); - data->addTriedServer(oldhost); - - HostAddress h; - bool redirectToDefaultHost = true; - try { - if (response->has_statusmsg()) { - try { - h = HostAddress::fromString(response->statusmsg()); - redirectToDefaultHost = false; - } catch (std::exception& e) { - h = channelManager->getDefaultHost(); - } - } else { - h = channelManager->getDefaultHost(); - } - } catch (std::exception& e) { - LOG4CXX_ERROR(logger, "Failed to retrieve redirected host of request " << *data - << " : " << e.what()); - data->getCallback()->operationFailed(InvalidRedirectException()); - return; - } - if (data->hasTriedServer(h)) { - LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with [" - << h.getAddressString()<< "] by " << oldhost.getAddressString() - << " but we've already tried that. Failing operation"); - data->getCallback()->operationFailed(InvalidRedirectException()); - return; - } - LOG4CXX_INFO(logger, "We've been told [" << data->getTopic() << "] is on [" << h.getAddressString() - << "] by [" << oldhost.getAddressString() << "]. Redirecting request " - << data->getTxnId()); - data->setShouldClaim(true); - - // submit the request again to the target host - if (redirectToDefaultHost) { - channelManager->submitOpToDefaultServer(data); - } else { - channelManager->redirectOpToHost(data, h); - } -} - -HedwigClientChannelHandler::HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager, - ResponseHandlerMap& handlers) - : channelManager(channelManager), handlers(handlers), closed(false), disconnected(false) { -} - -void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) { - LOG4CXX_DEBUG(logger, "Message received txnid(" << m->txnid() << ") status(" - << m->statuscode() << ")"); - if (m->has_message()) { - LOG4CXX_ERROR(logger, "Subscription response, ignore for now"); - return; - } - - PubSubDataPtr data = channel->retrieveTransaction(m->txnid()); - /* you now have ownership of data, don't leave this funciton without deleting it or - palming it off to someone else */ - - if (data.get() == 0) { - LOG4CXX_ERROR(logger, "No pub/sub request for txnid(" << m->txnid() << ")."); - return; - } - - // Store the topic2Host mapping if this wasn't a server redirect. - // TODO: add specific response for failure of getting topic ownership - // to distinguish SERVICE_DOWN to failure of getting topic ownership - if (m->statuscode() != NOT_RESPONSIBLE_FOR_TOPIC) { - const HostAddress& host = channel->getHostAddress(); - channelManager->setHostForTopic(data->getTopic(), host); - } - - const ResponseHandlerPtr& respHandler = handlers[data->getType()]; - if (respHandler.get()) { - respHandler->handleResponse(m, data, channel); - } else { - LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType() << " : " - << *data); - data->getCallback()->operationFailed(UnknownRequestException()); - } -} - -void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) { - // do nothing -} - -void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, - const std::exception& e) { - if (channelManager->isClosed()) { - return; - } - - // If this channel was closed explicitly by the client code, - // we do not need to do any of this logic. This could happen - // for redundant Publish channels created or redirected subscribe - // channels that are not used anymore or when we shutdown the - // client and manually close all of the open channels. - // Also don't do any of the disconnect logic if the client has stopped. - { - boost::lock_guard<boost::shared_mutex> lock(close_lock); - if (closed) { - return; - } - if (disconnected) { - return; - } - disconnected = true; - } - LOG4CXX_INFO(logger, "Channel " << channel.get() << " was disconnected."); - // execute logic after channel disconnected - onChannelDisconnected(channel); -} - -void HedwigClientChannelHandler::onChannelDisconnected(const DuplexChannelPtr& channel) { - // Clean up the channel from channel manager - channelManager->nonSubscriptionChannelDied(channel); -} - -void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) { - LOG4CXX_ERROR(logger, "Exception occurred" << e.what()); -} - -void HedwigClientChannelHandler::close() { - { - boost::lock_guard<boost::shared_mutex> lock(close_lock); - if (closed) { - return; - } - closed = true; - } - // do close handle logic here - doClose(); -} - -void HedwigClientChannelHandler::doClose() { - // do nothing for generic client channel handler -} - -// -// Pub/Sub Request Write Callback -// -PubSubWriteCallback::PubSubWriteCallback(const DuplexChannelPtr& channel, - const PubSubDataPtr& data) - : channel(channel), data(data) { -} - -void PubSubWriteCallback::operationComplete() { - LOG4CXX_INFO(logger, "Successfully wrote pubsub request : " << *data << " to channel " - << channel.get()); -} - -void PubSubWriteCallback::operationFailed(const std::exception& exception) { - LOG4CXX_ERROR(logger, "Error writing pubsub request (" << *data << ") : " << exception.what()); - - // remove the transaction from channel if write failed - channel->retrieveTransaction(data->getTxnId()); - data->getCallback()->operationFailed(exception); -} - -// -// Default Server Connect Callback -// -DefaultServerConnectCallback::DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager, - const DuplexChannelPtr& channel, - const PubSubDataPtr& data) - : channelManager(channelManager), channel(channel), data(data) { -} - -void DefaultServerConnectCallback::operationComplete() { - LOG4CXX_DEBUG(logger, "Channel " << channel.get() << " is connected to host " - << channel->getHostAddress() << "."); - // After connected, we got the right ip for the target host - // so we could submit the request right now - channelManager->submitOpThruChannel(data, channel); -} - -void DefaultServerConnectCallback::operationFailed(const std::exception& exception) { - LOG4CXX_ERROR(logger, "Channel " << channel.get() << " failed to connect to host " - << channel->getHostAddress() << " : " << exception.what()); - data->getCallback()->operationFailed(exception); -} - -// -// Subscription Event Emitter -// -SubscriptionEventEmitter::SubscriptionEventEmitter() {} - -void SubscriptionEventEmitter::addSubscriptionListener( - SubscriptionListenerPtr& listener) { - boost::lock_guard<boost::shared_mutex> lock(listeners_lock); - listeners.insert(listener); -} - -void SubscriptionEventEmitter::removeSubscriptionListener( - SubscriptionListenerPtr& listener) { - boost::lock_guard<boost::shared_mutex> lock(listeners_lock); - listeners.erase(listener); -} - -void SubscriptionEventEmitter::emitSubscriptionEvent( - const std::string& topic, const std::string& subscriberId, - const SubscriptionEvent event) { - boost::shared_lock<boost::shared_mutex> lock(listeners_lock); - if (0 == listeners.size()) { - return; - } - for (SubscriptionListenerSet::iterator iter = listeners.begin(); - iter != listeners.end(); ++iter) { - (*iter)->processEvent(topic, subscriberId, event); - } -} - -// -// Channel Manager Used to manage all established channels -// - -DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf) { - DuplexChannelManager * managerPtr; - if (conf.getBool(Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED, false)) { - managerPtr = new MultiplexDuplexChannelManager(conf); - } else { - managerPtr = new SimpleDuplexChannelManager(conf); - } - DuplexChannelManagerPtr manager(managerPtr); - LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << manager.get()); - return manager; -} - -DuplexChannelManager::DuplexChannelManager(const Configuration& conf) - : dispatcher(new EventDispatcher(conf)), conf(conf), closed(false), counterobj(), - defaultHostAddress(conf.get(Configuration::DEFAULT_SERVER, - DEFAULT_SERVER_DEFAULT_VAL)) { - sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED); - if (sslEnabled) { - sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf)); - } - LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << this << " with default server " - << defaultHostAddress); -} - -DuplexChannelManager::~DuplexChannelManager() { - LOG4CXX_DEBUG(logger, "Destroyed DuplexChannelManager " << this); -} - -void DuplexChannelManager::submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel) { - if (channel.get()) { - channel->storeTransaction(op); - OperationCallbackPtr writecb(new PubSubWriteCallback(channel, op)); - LOG4CXX_DEBUG(logger, "Submit pub/sub request " << *op << " thru channel " << channel.get()); - channel->writeRequest(op->getRequest(), writecb); - } else { - submitOpToDefaultServer(op); - } -} - -// Submit a pub/sub request -void DuplexChannelManager::submitOp(const PubSubDataPtr& op) { - DuplexChannelPtr channel; - switch (op->getType()) { - case PUBLISH: - case UNSUBSCRIBE: - try { - channel = getNonSubscriptionChannel(op->getTopic()); - } catch (std::exception& e) { - LOG4CXX_ERROR(logger, "Failed to submit request " << *op << " : " << e.what()); - op->getCallback()->operationFailed(e); - return; - } - break; - default: - TopicSubscriber ts(op->getTopic(), op->getSubscriberId()); - channel = getSubscriptionChannel(ts, op->isResubscribeRequest()); - break; - } - // write the pub/sub request - submitTo(op, channel); -} - -// Submit a pub/sub request to target host -void DuplexChannelManager::redirectOpToHost(const PubSubDataPtr& op, const HostAddress& addr) { - DuplexChannelPtr channel; - switch (op->getType()) { - case PUBLISH: - case UNSUBSCRIBE: - // check whether there is a channel existed for non-subscription requests - channel = getNonSubscriptionChannel(addr); - if (!channel.get()) { - channel = createNonSubscriptionChannel(addr); - channel = storeNonSubscriptionChannel(channel, true); - } - break; - default: - channel = getSubscriptionChannel(addr); - if (!channel.get()) { - channel = createSubscriptionChannel(addr); - channel = storeSubscriptionChannel(channel, true); - } - break; - } - // write the pub/sub request - submitTo(op, channel); -} - -// Submit a pub/sub request to established request -void DuplexChannelManager::submitOpThruChannel(const PubSubDataPtr& op, - const DuplexChannelPtr& ch) { - DuplexChannelPtr channel; - switch (op->getType()) { - case PUBLISH: - case UNSUBSCRIBE: - channel = storeNonSubscriptionChannel(ch, false); - break; - default: - channel = storeSubscriptionChannel(ch, false); - break; - } - // write the pub/sub request - submitTo(op, channel); -} - -// Submit a pub/sub request to default server -void DuplexChannelManager::submitOpToDefaultServer(const PubSubDataPtr& op) { - DuplexChannelPtr channel; - try { - switch (op->getType()) { - case PUBLISH: - case UNSUBSCRIBE: - channel = createNonSubscriptionChannel(getDefaultHost()); - break; - default: - channel = createSubscriptionChannel(getDefaultHost()); - break; - } - } catch (std::exception& e) { - LOG4CXX_ERROR(logger, "Failed to create channel to default host " << defaultHostAddress - << " for request " << op << " : " << e.what()); - op->getCallback()->operationFailed(e); - return; - } - OperationCallbackPtr connectCallback(new DefaultServerConnectCallback(shared_from_this(), - channel, op)); - // connect to default server. usually default server is a VIP, we only got the real - // IP address after connected. so before connected, we don't know the real target host. - // we only submit the request after channel is connected (ip address would be updated). - channel->connect(connectCallback); -} - -DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const std::string& topic) { - HostAddress addr; - { - boost::shared_lock<boost::shared_mutex> lock(topic2host_lock); - addr = topic2host[topic]; - } - if (addr.isNullHost()) { - return DuplexChannelPtr(); - } else { - // we had known which hub server owned the topic - DuplexChannelPtr ch = getNonSubscriptionChannel(addr); - if (ch.get()) { - return ch; - } - ch = createNonSubscriptionChannel(addr); - return storeNonSubscriptionChannel(ch, true); - } -} - -DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const HostAddress& addr) { - boost::shared_lock<boost::shared_mutex> lock(host2channel_lock); - return host2channel[addr]; -} - -DuplexChannelPtr DuplexChannelManager::createNonSubscriptionChannel(const HostAddress& addr) { - // Create a non-subscription channel handler - ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this(), - nonSubscriptionHandlers)); - // Create a non subscription channel - return createChannel(dispatcher->getService(), addr, handler); -} - -DuplexChannelPtr DuplexChannelManager::storeNonSubscriptionChannel(const DuplexChannelPtr& ch, - bool doConnect) { - const HostAddress& host = ch->getHostAddress(); - - bool useOldCh; - DuplexChannelPtr oldCh; - { - boost::lock_guard<boost::shared_mutex> lock(host2channel_lock); - - oldCh = host2channel[host]; - if (!oldCh.get()) { - host2channel[host] = ch; - useOldCh = false; - } else { - // If we've reached here, that means we already have a Channel - // mapping for the given host. This should ideally not happen - // and it means we are creating another Channel to a server host - // to publish on when we could have used an existing one. This could - // happen due to a race condition if initially multiple concurrent - // threads are publishing on the same topic and no Channel exists - // currently to the server. We are not synchronizing this initial - // creation of Channels to a given host for performance. - // Another possible way to have redundant Channels created is if - // a new topic is being published to, we connect to the default - // server host which should be a VIP that redirects to a "real" - // server host. Since we don't know beforehand what is the full - // set of server hosts, we could be redirected to a server that - // we already have a channel connection to from a prior existing - // topic. Close these redundant channels as they won't be used. - useOldCh = true; - } - } - if (useOldCh) { - LOG4CXX_DEBUG(logger, "Channel " << oldCh.get() << " to host " << host - << " already exists so close channel " << ch.get() << "."); - ch->close(); - return oldCh; - } else { - if (doConnect) { - ch->connect(); - } - LOG4CXX_DEBUG(logger, "Storing channel " << ch.get() << " for host " << host << "."); - return ch; - } -} - -DuplexChannelPtr DuplexChannelManager::createChannel(IOServicePtr& service, - const HostAddress& addr, - const ChannelHandlerPtr& handler) { - DuplexChannelPtr channel; - if (sslEnabled) { - boost_ssl_context_ptr sslCtx = sslCtxFactory->createSSLContext(service->getService()); - channel = DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler)); - } else { - channel = DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler)); - } - - boost::lock_guard<boost::shared_mutex> lock(allchannels_lock); - if (closed) { - channel->close(); - throw ShuttingDownException(); - } - allchannels.insert(channel); - LOG4CXX_DEBUG(logger, "Created a channel to " << addr << ", all channels : " << allchannels.size()); - - return channel; -} - -long DuplexChannelManager::nextTxnId() { - return counterobj.next(); -} - -void DuplexChannelManager::setHostForTopic(const std::string& topic, const HostAddress& host) { - boost::lock_guard<boost::shared_mutex> h2clock(host2topics_lock); - boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock); - topic2host[topic] = host; - TopicSetPtr ts = host2topics[host]; - if (!ts.get()) { - ts = TopicSetPtr(new TopicSet()); - host2topics[host] = ts; - } - ts->insert(topic); - LOG4CXX_DEBUG(logger, "Set ownership of topic " << topic << " to " << host << "."); -} - -void DuplexChannelManager::clearAllTopicsForHost(const HostAddress& addr) { - // remove topic mapping - boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock); - boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock); - Host2TopicsMap::iterator iter = host2topics.find(addr); - if (iter != host2topics.end()) { - for (TopicSet::iterator tsIter = iter->second->begin(); - tsIter != iter->second->end(); ++tsIter) { - topic2host.erase(*tsIter); - } - host2topics.erase(iter); - } -} - -void DuplexChannelManager::clearHostForTopic(const std::string& topic, - const HostAddress& addr) { - // remove topic mapping - boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock); - boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock); - Host2TopicsMap::iterator iter = host2topics.find(addr); - if (iter != host2topics.end()) { - iter->second->erase(topic); - } - HostAddress existed = topic2host[topic]; - if (existed == addr) { - topic2host.erase(topic); - } -} - -const HostAddress& DuplexChannelManager::getHostForTopic(const std::string& topic) { - boost::shared_lock<boost::shared_mutex> t2hlock(topic2host_lock); - return topic2host[topic]; -} - -/** - A channel has just died. Remove it so we never give it to any other publisher or subscriber. - - This does not delete the channel. Some publishers or subscribers will still hold it and will be errored - when they try to do anything with it. -*/ -void DuplexChannelManager::nonSubscriptionChannelDied(const DuplexChannelPtr& channel) { - // get host - HostAddress addr = channel->getHostAddress(); - - // Clear the topic owner ship when a nonsubscription channel disconnected - clearAllTopicsForHost(addr); - - // remove channel mapping - { - boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock); - host2channel.erase(addr); - } - removeChannel(channel); -} - -void DuplexChannelManager::removeChannel(const DuplexChannelPtr& channel) { - { - boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock); - allchannels.erase(channel); // channel should be deleted here - } - channel->close(); -} - -void DuplexChannelManager::start() { - // add non-subscription response handlers - nonSubscriptionHandlers[PUBLISH] = - ResponseHandlerPtr(new PublishResponseHandler(shared_from_this())); - nonSubscriptionHandlers[UNSUBSCRIBE] = - ResponseHandlerPtr(new UnsubscribeResponseHandler(shared_from_this())); - - // start the dispatcher - dispatcher->start(); -} - -bool DuplexChannelManager::isClosed() { - boost::shared_lock<boost::shared_mutex> lock(allchannels_lock); - return closed; -} - -void DuplexChannelManager::close() { - // stop the dispatcher - dispatcher->stop(); - { - boost::lock_guard<boost::shared_mutex> lock(allchannels_lock); - - closed = true; - for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) { - (*iter)->close(); - } - allchannels.clear(); - } - - // Unregistered response handlers - nonSubscriptionHandlers.clear(); - /* destruction of the maps will clean up any items they hold */ -} - -ClientImplPtr ClientImpl::Create(const Configuration& conf) { - ClientImplPtr impl(new ClientImpl(conf)); - LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl); - impl->channelManager->start(); - return impl; -} - -void ClientImpl::Destroy() { - LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this); - - // close the channel manager - channelManager->close(); - - if (subscriber != NULL) { - delete subscriber; - subscriber = NULL; - } - if (publisher != NULL) { - delete publisher; - publisher = NULL; - } -} - -ClientImpl::ClientImpl(const Configuration& conf) - : conf(conf), publisher(NULL), subscriber(NULL) -{ - channelManager = DuplexChannelManager::create(conf); -} - -Subscriber& ClientImpl::getSubscriber() { - return getSubscriberImpl(); -} - -Publisher& ClientImpl::getPublisher() { - return getPublisherImpl(); -} - -SubscriberImpl& ClientImpl::getSubscriberImpl() { - if (subscriber == NULL) { - boost::lock_guard<boost::mutex> lock(subscribercreate_lock); - if (subscriber == NULL) { - subscriber = new SubscriberImpl(channelManager); - } - } - return *subscriber; -} - -PublisherImpl& ClientImpl::getPublisherImpl() { - if (publisher == NULL) { - boost::lock_guard<boost::mutex> lock(publishercreate_lock); - if (publisher == NULL) { - publisher = new PublisherImpl(channelManager); - } - } - return *publisher; -} - -ClientImpl::~ClientImpl() { - LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this); -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.h b/hedwig-client/src/main/cpp/lib/clientimpl.h deleted file mode 100644 index fd7915c..0000000 --- a/hedwig-client/src/main/cpp/lib/clientimpl.h +++ /dev/null @@ -1,493 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef HEDWIG_CLIENT_IMPL_H -#define HEDWIG_CLIENT_IMPL_H - -#include <hedwig/client.h> -#include <hedwig/protocol.h> - -#include <boost/asio.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/unordered_map.hpp> -#else -#include <tr1/unordered_map> -#endif - -#include <list> - -#include "util.h" -#include "channel.h" -#include "data.h" -#include "eventdispatcher.h" - -namespace Hedwig { - const int DEFAULT_SYNC_REQUEST_TIMEOUT = 5000; - - template<class R> - class SyncCallback : public Callback<R> { - public: - SyncCallback(int timeout) : response(PENDING), timeout(timeout) {} - virtual void operationComplete(const R& r) { - if (response == TIMEOUT) { - return; - } - - { - boost::lock_guard<boost::mutex> lock(mut); - response = SUCCESS; - result = r; - } - cond.notify_all(); - } - - virtual void operationFailed(const std::exception& exception) { - if (response == TIMEOUT) { - return; - } - - { - boost::lock_guard<boost::mutex> lock(mut); - - if (typeid(exception) == typeid(ChannelConnectException)) { - response = NOCONNECT; - } else if (typeid(exception) == typeid(ServiceDownException)) { - response = SERVICEDOWN; - } else if (typeid(exception) == typeid(AlreadySubscribedException)) { - response = ALREADY_SUBSCRIBED; - } else if (typeid(exception) == typeid(NotSubscribedException)) { - response = NOT_SUBSCRIBED; - } else { - response = UNKNOWN; - } - } - cond.notify_all(); - } - - void wait() { - boost::unique_lock<boost::mutex> lock(mut); - while(response==PENDING) { - if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) { - response = TIMEOUT; - } - } - } - - void throwExceptionIfNeeded() { - switch (response) { - case SUCCESS: - break; - case NOCONNECT: - throw CannotConnectException(); - break; - case SERVICEDOWN: - throw ServiceDownException(); - break; - case ALREADY_SUBSCRIBED: - throw AlreadySubscribedException(); - break; - case NOT_SUBSCRIBED: - throw NotSubscribedException(); - break; - case TIMEOUT: - throw ClientTimeoutException(); - break; - default: - throw ClientException(); - break; - } - } - - R getResult() { return result; } - - private: - enum { - PENDING, - SUCCESS, - NOCONNECT, - SERVICEDOWN, - NOT_SUBSCRIBED, - ALREADY_SUBSCRIBED, - TIMEOUT, - UNKNOWN - } response; - - boost::condition_variable cond; - boost::mutex mut; - int timeout; - R result; - }; - - class SyncOperationCallback : public OperationCallback { - public: - SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {} - virtual void operationComplete(); - virtual void operationFailed(const std::exception& exception); - - void wait(); - void throwExceptionIfNeeded(); - - private: - enum { - PENDING, - SUCCESS, - NOCONNECT, - SERVICEDOWN, - NOT_SUBSCRIBED, - ALREADY_SUBSCRIBED, - TIMEOUT, - UNKNOWN - } response; - - boost::condition_variable cond; - boost::mutex mut; - int timeout; - }; - - class DuplexChannelManager; - typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr; - - // - // Hedwig Response Handler - // - - // Response Handler used to process response for different types of requests - class ResponseHandler { - public: - ResponseHandler(const DuplexChannelManagerPtr& channelManager); - virtual ~ResponseHandler() {}; - - virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn, - const DuplexChannelPtr& channel) = 0; - protected: - // common method used to redirect request - void redirectRequest(const PubSubResponsePtr& response, const PubSubDataPtr& data, - const DuplexChannelPtr& channel); - - // channel manager to manage all established channels - const DuplexChannelManagerPtr channelManager; - }; - - typedef boost::shared_ptr<ResponseHandler> ResponseHandlerPtr; - typedef std::tr1::unordered_map<OperationType, ResponseHandlerPtr, OperationTypeHash> ResponseHandlerMap; - - class PubSubWriteCallback : public OperationCallback { - public: - PubSubWriteCallback(const DuplexChannelPtr& channel, const PubSubDataPtr& data); - virtual void operationComplete(); - virtual void operationFailed(const std::exception& exception); - private: - DuplexChannelPtr channel; - PubSubDataPtr data; - }; - - class DefaultServerConnectCallback : public OperationCallback { - public: - DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager, - const DuplexChannelPtr& channel, - const PubSubDataPtr& data); - virtual void operationComplete(); - virtual void operationFailed(const std::exception& exception); - private: - DuplexChannelManagerPtr channelManager; - DuplexChannelPtr channel; - PubSubDataPtr data; - }; - - struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> { - size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const { - return reinterpret_cast<size_t>(listener.get()); - } - }; - - // Subscription Event Emitter - class SubscriptionEventEmitter { - public: - SubscriptionEventEmitter(); - - void addSubscriptionListener(SubscriptionListenerPtr& listener); - void removeSubscriptionListener(SubscriptionListenerPtr& listener); - void emitSubscriptionEvent(const std::string& topic, - const std::string& subscriberId, - const SubscriptionEvent event); - - private: - typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet; - SubscriptionListenerSet listeners; - boost::shared_mutex listeners_lock; - }; - - class SubscriberClientChannelHandler; - - // - // Duplex Channel Manager to manage all established channels - // - - class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> { - public: - static DuplexChannelManagerPtr create(const Configuration& conf); - virtual ~DuplexChannelManager(); - - inline const Configuration& getConfiguration() const { - return conf; - } - - // Submit a pub/sub request - void submitOp(const PubSubDataPtr& op); - - // Submit a pub/sub request to default host - // It is called only when client doesn't have the knowledge of topic ownership - void submitOpToDefaultServer(const PubSubDataPtr& op); - - // Redirect pub/sub request to a target hosts - void redirectOpToHost(const PubSubDataPtr& op, const HostAddress& host); - - // Submit a pub/sub request thru established channel - // It is called when connecting to default server to established a channel - void submitOpThruChannel(const PubSubDataPtr& op, const DuplexChannelPtr& channel); - - // Generate next transaction id for pub/sub requests sending thru this manager - long nextTxnId(); - - // return default host - inline const HostAddress getDefaultHost() { - return HostAddress::fromString(defaultHostAddress); - } - - // set the owner host of a topic - void setHostForTopic(const std::string& topic, const HostAddress& host); - - // clear all topics that hosted by a hub server - void clearAllTopicsForHost(const HostAddress& host); - - // clear host for a given topic - void clearHostForTopic(const std::string& topic, const HostAddress& host); - - // Called when a channel is disconnected - void nonSubscriptionChannelDied(const DuplexChannelPtr& channel); - - // Remove a channel from all channel map - void removeChannel(const DuplexChannelPtr& channel); - - // Get the subscription channel handler for a given subscription - virtual boost::shared_ptr<SubscriberClientChannelHandler> - getSubscriptionChannelHandler(const TopicSubscriber& ts) = 0; - - // Close subscription for a given subscription - virtual void asyncCloseSubscription(const TopicSubscriber& ts, - const OperationCallbackPtr& callback) = 0; - - virtual void handoverDelivery(const TopicSubscriber& ts, - const MessageHandlerCallbackPtr& handler, - const ClientMessageFilterPtr& filter) = 0; - - // start the channel manager - virtual void start(); - // close the channel manager - virtual void close(); - // whether the channel manager is closed - bool isClosed(); - - // Return an available service - inline boost::asio::io_service & getService() const { - return dispatcher->getService()->getService(); - } - - // Return the event emitter - inline SubscriptionEventEmitter& getEventEmitter() { - return eventEmitter; - } - - protected: - DuplexChannelManager(const Configuration& conf); - - // Get the ownership for a given topic. - const HostAddress& getHostForTopic(const std::string& topic); - - // - // Channel Management - // - - // Non subscription channel management - - // Get a non subscription channel for a given topic - // If the topic's owner is known, retrieve a subscription channel to - // target host (if there is no channel existed, create one); - // If the topic's owner is unknown, return null - DuplexChannelPtr getNonSubscriptionChannel(const std::string& topic); - - // Get an existed non subscription channel to a given host - DuplexChannelPtr getNonSubscriptionChannel(const HostAddress& addr); - - // Create a non subscription channel to a given host - DuplexChannelPtr createNonSubscriptionChannel(const HostAddress& addr); - - // Store the established non subscription channel - DuplexChannelPtr storeNonSubscriptionChannel(const DuplexChannelPtr& ch, - bool doConnect); - - // - // Subscription Channel Management - // - - // Get a subscription channel for a given subscription. - // If there is subscription channel established before, return it. - // Otherwise, check whether the topic's owner is known. If the topic owner - // is known, retrieve a subscription channel to target host (if there is no - // channel exsited, create one); If unknown, return null - virtual DuplexChannelPtr getSubscriptionChannel(const TopicSubscriber& ts, - const bool isResubscribeRequest) = 0; - - // Get an existed subscription channel to a given host - virtual DuplexChannelPtr getSubscriptionChannel(const HostAddress& addr) = 0; - - // Create a subscription channel to a given host - // If store is true, store the channel for future usage. - // If store is false, return a newly created channel. - virtual DuplexChannelPtr createSubscriptionChannel(const HostAddress& addr) = 0; - - // Store the established subscription channel - virtual DuplexChannelPtr storeSubscriptionChannel(const DuplexChannelPtr& ch, - bool doConnect) = 0; - - // - // Raw Channel Management - // - - // Create a raw channel - DuplexChannelPtr createChannel(IOServicePtr& service, - const HostAddress& addr, const ChannelHandlerPtr& handler); - - // event dispatcher running io threads - typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr; - EventDispatcherPtr dispatcher; - - // topic2host mapping for topic ownership - std::tr1::unordered_map<std::string, HostAddress> topic2host; - boost::shared_mutex topic2host_lock; - typedef std::tr1::unordered_set<std::string> TopicSet; - typedef boost::shared_ptr<TopicSet> TopicSetPtr; - typedef std::tr1::unordered_map<HostAddress, TopicSetPtr, HostAddressHash> Host2TopicsMap; - Host2TopicsMap host2topics; - boost::shared_mutex host2topics_lock; - private: - // write the request to target channel - void submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel); - - const Configuration& conf; - bool sslEnabled; - SSLContextFactoryPtr sslCtxFactory; - - // whether the channel manager is shutting down - bool closed; - - // counter used for generating transaction ids - ClientTxnCounter counterobj; - - std::string defaultHostAddress; - - // non-subscription channels - std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel; - boost::shared_mutex host2channel_lock; - - // maintain all established channels - typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap; - ChannelMap allchannels; - boost::shared_mutex allchannels_lock; - - // Response Handlers for non-subscription requests - ResponseHandlerMap nonSubscriptionHandlers; - - // Subscription Event Emitter - SubscriptionEventEmitter eventEmitter; - }; - - // - // Hedwig Client Channel Handler to handle responses received from the channel - // - - class HedwigClientChannelHandler : public ChannelHandler { - public: - HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager, - ResponseHandlerMap& handlers); - virtual ~HedwigClientChannelHandler() {} - - virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m); - virtual void channelConnected(const DuplexChannelPtr& channel); - virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e); - virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e); - - void close(); - protected: - // real channel disconnected logic - virtual void onChannelDisconnected(const DuplexChannelPtr& channel); - - // real close logic - virtual void doClose(); - - // channel manager to manage all established channels - const DuplexChannelManagerPtr channelManager; - ResponseHandlerMap& handlers; - - boost::shared_mutex close_lock; - // Boolean indicating if we closed the handler explicitly or not. - // If so, we do not need to do the channel disconnected logic here. - bool closed; - // whether channel is disconnected. - bool disconnected; - }; - - class PublisherImpl; - class SubscriberImpl; - - /** - Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter. - */ - class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { - public: - static ClientImplPtr Create(const Configuration& conf); - void Destroy(); - - Subscriber& getSubscriber(); - Publisher& getPublisher(); - - SubscriberImpl& getSubscriberImpl(); - PublisherImpl& getPublisherImpl(); - - ~ClientImpl(); - private: - ClientImpl(const Configuration& conf); - - const Configuration& conf; - - boost::mutex publishercreate_lock; - PublisherImpl* publisher; - - boost::mutex subscribercreate_lock; - SubscriberImpl* subscriber; - - // channel manager manage all channels for the client - DuplexChannelManagerPtr channelManager; - }; -}; -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/data.cpp b/hedwig-client/src/main/cpp/lib/data.cpp deleted file mode 100644 index 24d458e..0000000 --- a/hedwig-client/src/main/cpp/lib/data.cpp +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <hedwig/protocol.h> -#include "data.h" - -#include <log4cxx/logger.h> -#include <iostream> -#include <boost/thread/locks.hpp> - -#define stringify( name ) #name - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -using namespace Hedwig; - -const char* OPERATION_TYPE_NAMES[] = { - stringify( PUBLISH ), - stringify( SUBSCRIBE ), - stringify( CONSUME ), - stringify( UNSUBSCRIBE ), - stringify( START_DELIVERY ), - stringify( STOP_DELIVERY ), - stringify( CLOSESUBSCRIPTION ) -}; - -PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const Message& body, - const ResponseCallbackPtr& callback) { - PubSubDataPtr ptr(new PubSubData()); - ptr->type = PUBLISH; - ptr->txnid = txnid; - ptr->topic = topic; - ptr->body.CopyFrom(body); - ptr->callback = callback; - return ptr; -} - -PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, - const ResponseCallbackPtr& callback, const SubscriptionOptions& options) { - PubSubDataPtr ptr(new PubSubData()); - ptr->type = SUBSCRIBE; - ptr->txnid = txnid; - ptr->subscriberid = subscriberid; - ptr->topic = topic; - ptr->callback = callback; - ptr->options = options; - return ptr; -} - -PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, - const ResponseCallbackPtr& callback) { - PubSubDataPtr ptr(new PubSubData()); - ptr->type = UNSUBSCRIBE; - ptr->txnid = txnid; - ptr->subscriberid = subscriberid; - ptr->topic = topic; - ptr->callback = callback; - return ptr; -} - -PubSubDataPtr PubSubData::forCloseSubscriptionRequest( - long txnid, const std::string& subscriberid, const std::string& topic, - const ResponseCallbackPtr& callback) { - PubSubDataPtr ptr(new PubSubData()); - ptr->type = CLOSESUBSCRIPTION; - ptr->txnid = txnid; - ptr->subscriberid = subscriberid; - ptr->topic = topic; - ptr->callback = callback; - return ptr; -} - -PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) { - PubSubDataPtr ptr(new PubSubData()); - ptr->type = CONSUME; - ptr->txnid = txnid; - ptr->subscriberid = subscriberid; - ptr->topic = topic; - ptr->msgid = msgid; - return ptr; -} - -PubSubData::PubSubData() : shouldClaim(false), messageBound(0) { -} - -PubSubData::~PubSubData() { -} - -OperationType PubSubData::getType() const { - return type; -} - -long PubSubData::getTxnId() const { - return txnid; -} - -const std::string& PubSubData::getTopic() const { - return topic; -} - -const Message& PubSubData::getBody() const { - return body; -} - -const MessageSeqId PubSubData::getMessageSeqId() const { - return msgid; -} - -void PubSubData::setPreferencesForSubRequest(SubscribeRequest * subreq, - const SubscriptionOptions &options) { - Hedwig::SubscriptionPreferences* preferences = subreq->mutable_preferences(); - if (options.messagebound() > 0) { - preferences->set_messagebound(options.messagebound()); - } - if (options.has_messagefilter()) { - preferences->set_messagefilter(options.messagefilter()); - } - if (options.has_options()) { - preferences->mutable_options()->CopyFrom(options.options()); - } - if (options.has_messagewindowsize()) { - preferences->set_messagewindowsize(options.messagewindowsize()); - } -} - -const PubSubRequestPtr PubSubData::getRequest() { - PubSubRequestPtr request(new Hedwig::PubSubRequest()); - request->set_protocolversion(Hedwig::VERSION_ONE); - request->set_type(type); - request->set_txnid(txnid); - if (shouldClaim) { - request->set_shouldclaim(shouldClaim); - } - request->set_topic(topic); - - if (type == PUBLISH) { - LOG4CXX_DEBUG(logger, "Creating publish request"); - - Hedwig::PublishRequest* pubreq = request->mutable_publishrequest(); - Hedwig::Message* msg = pubreq->mutable_msg(); - msg->CopyFrom(body); - } else if (type == SUBSCRIBE) { - LOG4CXX_DEBUG(logger, "Creating subscribe request"); - - Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest(); - subreq->set_subscriberid(subscriberid); - subreq->set_createorattach(options.createorattach()); - subreq->set_forceattach(options.forceattach()); - setPreferencesForSubRequest(subreq, options); - } else if (type == CONSUME) { - LOG4CXX_DEBUG(logger, "Creating consume request"); - - Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest(); - conreq->set_subscriberid(subscriberid); - conreq->mutable_msgid()->CopyFrom(msgid); - } else if (type == UNSUBSCRIBE) { - LOG4CXX_DEBUG(logger, "Creating unsubscribe request"); - - Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest(); - unsubreq->set_subscriberid(subscriberid); - } else if (type == CLOSESUBSCRIPTION) { - LOG4CXX_DEBUG(logger, "Creating closeSubscription request"); - - Hedwig::CloseSubscriptionRequest* closesubreq = request->mutable_closesubscriptionrequest(); - closesubreq->set_subscriberid(subscriberid); - } else { - LOG4CXX_ERROR(logger, "Tried to create a request message for the wrong type [" << type << "]"); - throw UnknownRequestException(); - } - - return request; -} - -void PubSubData::setShouldClaim(bool shouldClaim) { - this->shouldClaim = shouldClaim; -} - -void PubSubData::addTriedServer(HostAddress& h) { - triedservers.insert(h); -} - -bool PubSubData::hasTriedServer(HostAddress& h) { - return triedservers.count(h) > 0; -} - -void PubSubData::clearTriedServers() { - triedservers.clear(); -} - -ResponseCallbackPtr& PubSubData::getCallback() { - return callback; -} - -void PubSubData::setCallback(const ResponseCallbackPtr& callback) { - this->callback = callback; -} - -const std::string& PubSubData::getSubscriberId() const { - return subscriberid; -} - -const SubscriptionOptions& PubSubData::getSubscriptionOptions() const { - return options; -} - -void PubSubData::setOrigChannelForResubscribe( - boost::shared_ptr<DuplexChannel>& channel) { - this->origChannel = channel; -} - -boost::shared_ptr<DuplexChannel>& PubSubData::getOrigChannelForResubscribe() { - return this->origChannel; -} - -bool PubSubData::isResubscribeRequest() { - return 0 != this->origChannel.get(); -} - -ClientTxnCounter::ClientTxnCounter() : counter(0) -{ -} - -ClientTxnCounter::~ClientTxnCounter() { -} - -/** -Increment the transaction counter and return the new value. - -@returns the next transaction id -*/ -long ClientTxnCounter::next() { // would be nice to remove lock from here, look more into it - boost::lock_guard<boost::mutex> lock(mutex); - - long next= ++counter; - - return next; -} - -std::ostream& Hedwig::operator<<(std::ostream& os, const PubSubData& data) { - OperationType type = data.getType(); - os << "[" << OPERATION_TYPE_NAMES[type] << " request (txn:" << data.getTxnId() - << ") for (topic:" << data.getTopic(); - switch (type) { - case SUBSCRIBE: - case UNSUBSCRIBE: - case CLOSESUBSCRIPTION: - os << ", subscriber:" << data.getSubscriberId() << ")"; - break; - case CONSUME: - os << ", subscriber:" << data.getSubscriberId() << ", seq:" - << data.getMessageSeqId().localcomponent() << ")"; - break; - case PUBLISH: - default: - os << ")"; - break; - } - return os; -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/data.h b/hedwig-client/src/main/cpp/lib/data.h deleted file mode 100644 index 0639f4a..0000000 --- a/hedwig-client/src/main/cpp/lib/data.h +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef DATA_H -#define DATA_H - -#include <hedwig/protocol.h> -#include <hedwig/callback.h> - -#include <pthread.h> -#include <iostream> - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/unordered_set.hpp> -#else -#include <tr1/unordered_set> -#endif - -#include "util.h" -#include <boost/shared_ptr.hpp> -#include <boost/thread/mutex.hpp> - -namespace Hedwig { - /** - Simple counter for transaction ids from the client - */ - class ClientTxnCounter { - public: - ClientTxnCounter(); - ~ClientTxnCounter(); - long next(); - - private: - long counter; - boost::mutex mutex; - }; - - typedef Callback<ResponseBody> ResponseCallback; - typedef std::tr1::shared_ptr<ResponseCallback> ResponseCallbackPtr; - - class PubSubData; - typedef boost::shared_ptr<PubSubData> PubSubDataPtr; - typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr; - typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr; - - class DuplexChannel; - - /** - Data structure to hold information about requests and build request messages. - Used to store requests which may need to be resent to another server. - */ - class PubSubData { - public: - // to be used for publish - static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const Message& body, - const ResponseCallbackPtr& callback); - static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, - const ResponseCallbackPtr& callback, const SubscriptionOptions& options); - static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic, - const ResponseCallbackPtr& callback); - static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid); - - static PubSubDataPtr forCloseSubscriptionRequest(long txnid, const std::string& subscriberid, - const std::string& topic, - const ResponseCallbackPtr& callback); - - ~PubSubData(); - - OperationType getType() const; - long getTxnId() const; - const std::string& getSubscriberId() const; - const std::string& getTopic() const; - const Message& getBody() const; - const MessageSeqId getMessageSeqId() const; - - void setShouldClaim(bool shouldClaim); - void setMessageBound(int messageBound); - - const PubSubRequestPtr getRequest(); - void setCallback(const ResponseCallbackPtr& callback); - ResponseCallbackPtr& getCallback(); - const SubscriptionOptions& getSubscriptionOptions() const; - - void addTriedServer(HostAddress& h); - bool hasTriedServer(HostAddress& h); - void clearTriedServers(); - - void setOrigChannelForResubscribe(boost::shared_ptr<DuplexChannel>& channel); - bool isResubscribeRequest(); - boost::shared_ptr<DuplexChannel>& getOrigChannelForResubscribe(); - - friend std::ostream& operator<<(std::ostream& os, const PubSubData& data); - private: - - PubSubData(); - - void setPreferencesForSubRequest(SubscribeRequest * subreq, - const SubscriptionOptions &options); - - OperationType type; - long txnid; - std::string subscriberid; - std::string topic; - Message body; - bool shouldClaim; - int messageBound; - ResponseCallbackPtr callback; - SubscriptionOptions options; - MessageSeqId msgid; - std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers; - // record the origChannel for a resubscribe request - boost::shared_ptr<DuplexChannel> origChannel; - }; - -}; -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp b/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp deleted file mode 100644 index af3560c..0000000 --- a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "eventdispatcher.h" - -#include <log4cxx/logger.h> - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -using namespace Hedwig; - -const int DEFAULT_NUM_DISPATCH_THREADS = 1; - -IOService::IOService() { -} - -IOService::~IOService() {} - -void IOService::start() { - if (work.get()) { - return; - } - work = work_ptr(new boost::asio::io_service::work(service)); -} - -void IOService::stop() { - if (!work.get()) { - return; - } - - work = work_ptr(); - service.stop(); -} - -void IOService::run() { - while (true) { - try { - service.run(); - break; - } catch (std::exception &e) { - LOG4CXX_ERROR(logger, "Exception in IO Service " << this << " : " << e.what()); - } - } -} - -EventDispatcher::EventDispatcher(const Configuration& conf) - : conf(conf), running(false), next_io_service(0) { - num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS, - DEFAULT_NUM_DISPATCH_THREADS); - if (0 == num_threads) { - LOG4CXX_ERROR(logger, "Number of threads in dispatcher is zero"); - throw std::runtime_error("number of threads in dispatcher is zero"); - } - for (size_t i = 0; i < num_threads; i++) { - services.push_back(IOServicePtr(new IOService())); - } - LOG4CXX_DEBUG(logger, "Created EventDispatcher " << this); -} - -void EventDispatcher::run_forever(IOServicePtr service, size_t idx) { - LOG4CXX_INFO(logger, "Starting event dispatcher " << idx); - - service->run(); - - LOG4CXX_INFO(logger, "Event dispatcher " << idx << " done"); -} - -void EventDispatcher::start() { - if (running) { - return; - } - - for (size_t i = 0; i < num_threads; i++) { - IOServicePtr service = services[i]; - service->start(); - // new thread - thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever, - this, service, i))); - threads.push_back(t); - } - running = true; -} - -void EventDispatcher::stop() { - if (!running) { - return; - } - - for (size_t i = 0; i < num_threads; i++) { - services[i]->stop(); - } - - for (size_t i = 0; i < num_threads; i++) { - threads[i]->join(); - } - threads.clear(); - - running = false; -} - -EventDispatcher::~EventDispatcher() { - services.clear(); -} - -IOServicePtr& EventDispatcher::getService() { - size_t next = 0; - { - boost::lock_guard<boost::mutex> lock(next_lock); - next = next_io_service; - next_io_service = (next_io_service + 1) % num_threads; - } - return services[next]; -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.h b/hedwig-client/src/main/cpp/lib/eventdispatcher.h deleted file mode 100644 index b6a7504..0000000 --- a/hedwig-client/src/main/cpp/lib/eventdispatcher.h +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef EVENTDISPATCHER_H -#define EVENTDISPATCHER_H - -#include <vector> - -#include <hedwig/client.h> - -#include <boost/asio.hpp> -#include <boost/thread.hpp> -#include <boost/shared_ptr.hpp> - -namespace Hedwig { - typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr; - typedef boost::shared_ptr<boost::thread> thread_ptr; - - class IOService; - typedef boost::shared_ptr<IOService> IOServicePtr; - - class IOService { - public: - IOService(); - virtual ~IOService(); - - // start the io service - void start(); - // stop the io service - void stop(); - // run the io service - void run(); - - inline boost::asio::io_service& getService() { - return service; - } - - private: - boost::asio::io_service service; - work_ptr work; - }; - - class EventDispatcher { - public: - EventDispatcher(const Configuration& conf); - ~EventDispatcher(); - - void start(); - - void stop(); - - IOServicePtr& getService(); - - private: - void run_forever(IOServicePtr service, size_t idx); - - const Configuration& conf; - - // number of threads - size_t num_threads; - // running flag - bool running; - // pool of io_services. - std::vector<IOServicePtr> services; - // threads - std::vector<thread_ptr> threads; - // next io_service used for a connection - boost::mutex next_lock; - std::size_t next_io_service; - }; -} - -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/exceptions.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/exceptions.cpp b/hedwig-client/src/main/cpp/lib/exceptions.cpp deleted file mode 100644 index 9e062dc..0000000 --- a/hedwig-client/src/main/cpp/lib/exceptions.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <hedwig/exceptions.h> -#include <stdlib.h> -#include <string.h> - -using namespace Hedwig; - - - - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp deleted file mode 100644 index 07d884c..0000000 --- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "filterablemessagehandler.h" - -using namespace Hedwig; - -FilterableMessageHandler::FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler, - const ClientMessageFilterPtr& msgFilter) - : msgHandler(msgHandler), msgFilter(msgFilter) { -} - -FilterableMessageHandler::~FilterableMessageHandler() { -} - -void FilterableMessageHandler::consume(const std::string& topic, const std::string& subscriberId, - const Message& msg, OperationCallbackPtr& callback) { - bool deliver = true; - if (0 != msgFilter.get()) { - deliver = msgFilter->testMessage(msg); - } - if (deliver) { - msgHandler->consume(topic, subscriberId, msg, callback); - } else { - callback->operationComplete(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h deleted file mode 100644 index 2d24bd5..0000000 --- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef FILTERABLE_MESSAGE_HANDLER_H -#define FILTERABLE_MESSAGE_HANDLER_H - -#include <hedwig/callback.h> -#include <hedwig/protocol.h> - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/memory.hpp> -#else -#include <tr1/memory> -#endif - -namespace Hedwig { - - class FilterableMessageHandler : public MessageHandlerCallback { - public: - FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler, - const ClientMessageFilterPtr& msgFilter); - - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Message& msg, OperationCallbackPtr& callback); - - virtual ~FilterableMessageHandler(); - private: - const MessageHandlerCallbackPtr msgHandler; - const ClientMessageFilterPtr msgFilter; - }; - -}; - -#endif -
