http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/callback.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/inc/hedwig/callback.h b/hedwig-client/src/main/cpp/inc/hedwig/callback.h deleted file mode 100644 index 80e961b..0000000 --- a/hedwig-client/src/main/cpp/inc/hedwig/callback.h +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_CALLBACK_H -#define HEDWIG_CALLBACK_H - -#include <string> -#include <hedwig/exceptions.h> -#include <hedwig/protocol.h> - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/memory.hpp> -#else -#include <tr1/memory> -#endif - -namespace Hedwig { - - // - // A Listener registered for a Subscriber instance to emit events - // for those disable resubscribe subscriptions. - // - class SubscriptionListener { - public: - virtual void processEvent(const std::string &topic, const std::string &subscriberId, - const Hedwig::SubscriptionEvent event) = 0; - virtual ~SubscriptionListener() {}; - }; - typedef std::tr1::shared_ptr<SubscriptionListener> SubscriptionListenerPtr; - - template<class R> - class Callback { - public: - virtual void operationComplete(const R& result) = 0; - virtual void operationFailed(const std::exception& exception) = 0; - - virtual ~Callback() {}; - }; - - class OperationCallback { - public: - virtual void operationComplete() = 0; - virtual void operationFailed(const std::exception& exception) = 0; - - virtual ~OperationCallback() {}; - }; - typedef std::tr1::shared_ptr<OperationCallback> OperationCallbackPtr; - - class MessageHandlerCallback { - public: - virtual void consume(const std::string& topic, const std::string& subscriberId, const Message& msg, OperationCallbackPtr& callback) = 0; - - virtual ~MessageHandlerCallback() {}; - }; - typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr; - - typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr; - - class ClientMessageFilter { - public: - virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId, - const SubscriptionPreferencesPtr& preferences) = 0; - virtual bool testMessage(const Message& message) = 0; - - virtual ~ClientMessageFilter() {}; - }; - typedef std::tr1::shared_ptr<ClientMessageFilter> ClientMessageFilterPtr; -} - -#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/client.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/inc/hedwig/client.h b/hedwig-client/src/main/cpp/inc/hedwig/client.h deleted file mode 100644 index 7b914bc..0000000 --- a/hedwig-client/src/main/cpp/inc/hedwig/client.h +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_CLIENT_H -#define HEDWIG_CLIENT_H - -#include <string> - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/memory.hpp> -#else -#include <tr1/memory> -#endif - -#include <hedwig/subscribe.h> -#include <hedwig/publish.h> -#include <hedwig/exceptions.h> -#include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> - -namespace Hedwig { - - class ClientImpl; - typedef boost::shared_ptr<ClientImpl> ClientImplPtr; - - class Configuration { - public: - static const std::string DEFAULT_SERVER; - static const std::string MESSAGE_CONSUME_RETRY_WAIT_TIME; - static const std::string SUBSCRIBER_CONSUME_RETRY_WAIT_TIME; - static const std::string MAX_MESSAGE_QUEUE_SIZE; - static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME; - static const std::string SYNC_REQUEST_TIMEOUT; - static const std::string SUBSCRIBER_AUTOCONSUME; - static const std::string NUM_DISPATCH_THREADS; - static const std::string SSL_ENABLED; - static const std::string SSL_PEM_FILE; - static const std::string SUBSCRIPTION_CHANNEL_SHARING_ENABLED; - /** - * The maximum number of messages the hub will queue for subscriptions - * created using this configuration. The hub will always queue the most - * recent messages. If there are enough publishes to the topic to hit - * the bound, then the oldest messages are dropped from the queue. - * - * A bound of 0 disabled the bound completely. - */ - static const std::string SUBSCRIPTION_MESSAGE_BOUND; - - public: - Configuration() {}; - virtual int getInt(const std::string& key, int defaultVal) const = 0; - virtual const std::string get(const std::string& key, const std::string& defaultVal) const = 0; - virtual bool getBool(const std::string& key, bool defaultVal) const = 0; - - virtual ~Configuration() {} - }; - - /** - Main Hedwig client class. This class is used to acquire an instance of the Subscriber of Publisher. - */ - class Client : private boost::noncopyable { - public: - Client(const Configuration& conf); - - /** - Retrieve the subscriber object - */ - Subscriber& getSubscriber(); - - /** - Retrieve the publisher object - */ - Publisher& getPublisher(); - - ~Client(); - - private: - ClientImplPtr clientimpl; - }; - - -}; - -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h b/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h deleted file mode 100644 index b44fed9..0000000 --- a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_EXCEPTION_H -#define HEDWIG_EXCEPTION_H - -#include <exception> - -namespace Hedwig { - - class ClientException : public std::exception { }; - - class ClientTimeoutException : public ClientException {}; - - class ServiceDownException : public ClientException {}; - class CannotConnectException : public ClientException {}; - class UnexpectedResponseException : public ClientException {}; - class OomException : public ClientException {}; - class UnknownRequestException : public ClientException {}; - class InvalidRedirectException : public ClientException {}; - class NoChannelHandlerException : public ClientException {}; - - class PublisherException : public ClientException { }; - - class SubscriberException : public ClientException { }; - class AlreadySubscribedException : public SubscriberException {}; - class NotSubscribedException : public SubscriberException {}; - class ResubscribeException : public SubscriberException {}; - class NullMessageHandlerException : public SubscriberException {}; - class NullMessageFilterException : public SubscriberException {}; - - class AlreadyStartDeliveryException : public SubscriberException {}; - class StartingDeliveryException : public SubscriberException {}; - - class ConfigurationException : public ClientException { }; - class InvalidPortException : public ConfigurationException {}; - class HostResolutionException : public ClientException {}; - - class InvalidStateException : public ClientException {}; - class ShuttingDownException : public InvalidStateException {}; -}; - -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/publish.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/inc/hedwig/publish.h b/hedwig-client/src/main/cpp/inc/hedwig/publish.h deleted file mode 100644 index ea08838..0000000 --- a/hedwig-client/src/main/cpp/inc/hedwig/publish.h +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_PUBLISH_H -#define HEDWIG_PUBLISH_H - -#include <string> - -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <hedwig/protocol.h> -#include <boost/noncopyable.hpp> - -namespace Hedwig { - - typedef std::tr1::shared_ptr<PublishResponse> PublishResponsePtr; - typedef Callback<PublishResponsePtr> PublishResponseCallback; - typedef std::tr1::shared_ptr<PublishResponseCallback> PublishResponseCallbackPtr; - - /** - Interface for publishing to a hedwig instance. - */ - class Publisher : private boost::noncopyable { - public: - /** - Publish message for topic, and block until we receive a ACK response from the hedwig server. - - @param topic Topic to publish to. - @param message Data to publish for topic. - */ - virtual PublishResponsePtr publish(const std::string& topic, const std::string& message) = 0; - - virtual PublishResponsePtr publish(const std::string& topic, const Message& message) = 0; - - /** - Asynchronously publish message for topic. - - @code - OperationCallbackPtr callback(new MyCallback()); - pub.asyncPublish(callback); - @endcode - - @param topic Topic to publish to. - @param message Data to publish to topic - @param callback Callback which will be used to report success or failure. Success is only reported once the server replies with an ACK response to the publication. - */ - virtual void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) = 0; - - virtual void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback) = 0; - - virtual void asyncPublishWithResponse(const std::string& topic, const Message& messsage, - const PublishResponseCallbackPtr& callback) = 0; - - virtual ~Publisher() {} - }; -}; - -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h b/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h deleted file mode 100644 index 4bc718c..0000000 --- a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_SUBSCRIBE_H -#define HEDWIG_SUBSCRIBE_H - -#include <string> - -#include <hedwig/exceptions.h> -#include <hedwig/callback.h> -#include <hedwig/protocol.h> -#include <boost/noncopyable.hpp> - -namespace Hedwig { - - /** - Interface for subscribing to a hedwig instance. - */ - class Subscriber : private boost::noncopyable { - public: - virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) = 0; - virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0; - virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0; - virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0; - - virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0; - virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0; - - virtual void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) = 0; - - virtual void startDelivery(const std::string& topic, const std::string& subscriberId, - const MessageHandlerCallbackPtr& callback) = 0; - virtual void startDeliveryWithFilter(const std::string& topic, - const std::string& subscriberId, - const MessageHandlerCallbackPtr& callback, - const ClientMessageFilterPtr& filter) = 0; - - virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0; - - virtual bool hasSubscription(const std::string& topic, const std::string& subscriberId) = 0; - virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0; - virtual void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId, - const OperationCallbackPtr& callback) = 0; - - // - // API to register/unregister subscription listeners for receiving - // events indicating subscription changes for those disable resubscribe - // subscriptions - // - virtual void addSubscriptionListener(SubscriptionListenerPtr& listener) = 0; - virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener) = 0; - - virtual ~Subscriber() {} - }; -}; - -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/Makefile.am ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/Makefile.am b/hedwig-client/src/main/cpp/lib/Makefile.am deleted file mode 100644 index f19a3da..0000000 --- a/hedwig-client/src/main/cpp/lib/Makefile.am +++ /dev/null @@ -1,32 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto - -lib_LTLIBRARIES = libhedwig01.la -libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp simplesubscriberimpl.cpp multiplexsubscriberimpl.cpp -libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) -libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS) -libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB) - -protocol.cpp: $(PROTODEF) - protoc --cpp_out=. -I`dirname $(PROTODEF)` $(PROTODEF) - sed "s/PubSubProtocol.pb.h/hedwig\/protocol.h/" PubSubProtocol.pb.cc > protocol.cpp - rm PubSubProtocol.pb.cc - mv PubSubProtocol.pb.h $(top_srcdir)/inc/hedwig/protocol.h - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/channel.cpp b/hedwig-client/src/main/cpp/lib/channel.cpp deleted file mode 100644 index b980e53..0000000 --- a/hedwig-client/src/main/cpp/lib/channel.cpp +++ /dev/null @@ -1,801 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <poll.h> -#include <iostream> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <errno.h> -#include <vector> -#include <utility> -#include <deque> -#include "channel.h" -#include "util.h" -#include "clientimpl.h" - -#include <log4cxx/logger.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -using namespace Hedwig; - -const std::string DEFAULT_SSL_PEM_FILE = ""; - -AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service, - const HostAddress& addr, - const ChannelHandlerPtr& handler) - : address(addr), handler(handler), service(service->getService()), - instream(&in_buf), copy_buf(NULL), copy_buf_length(0), - state(UNINITIALISED), receiving(false), reading(false), sending(false), - closed(false) -{} - -AbstractDuplexChannel::~AbstractDuplexChannel() { - free(copy_buf); - copy_buf = NULL; - copy_buf_length = 0; - - LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")"); -} - -ChannelHandlerPtr AbstractDuplexChannel::getChannelHandler() { - return handler; -} - -/*static*/ void AbstractDuplexChannel::connectCallbackHandler( - AbstractDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error) { - channel->doAfterConnect(callback, error); -} - -void AbstractDuplexChannel::connect() { - connect(OperationCallbackPtr()); -} - -void AbstractDuplexChannel::connect(const OperationCallbackPtr& callback) { - setState(CONNECTING); - doConnect(callback); -} - -void AbstractDuplexChannel::doAfterConnect(const OperationCallbackPtr& callback, - const boost::system::error_code& error) { - if (error) { - LOG4CXX_ERROR(logger, "Channel " << this << " connect error : " << error.message().c_str()); - channelConnectFailed(ChannelConnectException(), callback); - return; - } - - // set no delay option - boost::system::error_code ec; - setSocketOption(ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " set up socket error : " << ec.message().c_str()); - channelConnectFailed(ChannelSetupException(), callback); - return; - } - - boost::asio::ip::tcp::endpoint localEp; - boost::asio::ip::tcp::endpoint remoteEp; - localEp = getLocalAddress(ec); - remoteEp = getRemoteAddress(ec); - - if (!ec) { - LOG4CXX_INFO(logger, "Channel " << this << " connected :" - << localEp.address().to_string() << ":" << localEp.port() << "=>" - << remoteEp.address().to_string() << ":" << remoteEp.port()); - // update ip address since if might connect to VIP - address.updateIP(remoteEp.address().to_v4().to_ulong()); - } - // the channel is connected - channelConnected(callback); -} - -void AbstractDuplexChannel::channelConnectFailed(const std::exception& e, - const OperationCallbackPtr& callback) { - channelDisconnected(e); - setState(DEAD); - if (callback.get()) { - callback->operationFailed(e); - } -} - -void AbstractDuplexChannel::channelConnected(const OperationCallbackPtr& callback) { - // for normal channel, we have done here - setState(CONNECTED); - if (callback.get()) { - callback->operationComplete(); - } - - // enable sending & receiving - startSending(); - startReceiving(); -} - -/*static*/ void AbstractDuplexChannel::messageReadCallbackHandler( - AbstractDuplexChannelPtr channel, - std::size_t message_size, - const boost::system::error_code& error, - std::size_t bytes_transferred) { - LOG4CXX_DEBUG(logger, "DuplexChannel::messageReadCallbackHandler " << error << ", " - << bytes_transferred << " channel(" << channel.get() << ")"); - - if (error) { - if (!channel->isClosed()) { - LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" - << bytes_transferred << ") channel(" << channel.get() << ")"); - } - channel->channelDisconnected(ChannelReadException()); - return; - } - - if (channel->copy_buf_length < message_size) { - channel->copy_buf_length = message_size; - channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length); - if (channel->copy_buf == NULL) { - LOG4CXX_ERROR(logger, "Error allocating buffer. channel(" << channel.get() << ")"); - // if failed to realloc memory, we should disconnect the channel. - // then it would enter disconnect logic, which would close channel and release - // its resources includes the copy_buf memory. - channel->channelDisconnected(ChannelOutOfMemoryException()); - return; - } - } - - channel->instream.read(channel->copy_buf, message_size); - PubSubResponsePtr response(new PubSubResponse()); - bool err = response->ParseFromArray(channel->copy_buf, message_size); - - if (!err) { - LOG4CXX_ERROR(logger, "Error parsing message. channel(" << channel.get() << ")"); - channel->channelDisconnected(ChannelReadException()); - return; - } else { - LOG4CXX_DEBUG(logger, "channel(" << channel.get() << ") : " << channel->in_buf.size() - << " bytes left in buffer"); - } - - ChannelHandlerPtr h; - { - boost::shared_lock<boost::shared_mutex> lock(channel->destruction_lock); - if (channel->handler.get()) { - h = channel->handler; - } - } - - // channel did stopReceiving, we should not call #messageReceived - // store this response in outstanding_response variable and did stop receiving - // when we startReceiving again, we can process this last response. - { - boost::lock_guard<boost::mutex> lock(channel->receiving_lock); - if (!channel->isReceiving()) { - // queue the response - channel->outstanding_response = response; - channel->reading = false; - return; - } - } - - // channel is still in receiving status - if (h.get()) { - h->messageReceived(channel, response); - } - - AbstractDuplexChannel::readSize(channel); -} - -/*static*/ void AbstractDuplexChannel::sizeReadCallbackHandler( - AbstractDuplexChannelPtr channel, - const boost::system::error_code& error, - std::size_t bytes_transferred) { - LOG4CXX_DEBUG(logger, "DuplexChannel::sizeReadCallbackHandler " << error << ", " - << bytes_transferred << " channel(" << channel.get() << ")"); - - if (error) { - if (!channel->isClosed()) { - LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" - << bytes_transferred << ") channel(" << channel.get() << ")"); - } - channel->channelDisconnected(ChannelReadException()); - return; - } - - if (channel->in_buf.size() < sizeof(uint32_t)) { - LOG4CXX_ERROR(logger, "Not enough data in stream. Must have been an error reading. " - << " Closing channel(" << channel.get() << ")"); - channel->channelDisconnected(ChannelReadException()); - return; - } - - uint32_t size; - std::istream is(&channel->in_buf); - is.read((char*)&size, sizeof(uint32_t)); - size = ntohl(size); - - int toread = size - channel->in_buf.size(); - LOG4CXX_DEBUG(logger, " size of incoming message " << size << ", currently in buffer " - << channel->in_buf.size() << " channel(" << channel.get() << ")"); - if (toread <= 0) { - AbstractDuplexChannel::messageReadCallbackHandler(channel, size, error, 0); - } else { - channel->readMsgBody(channel->in_buf, toread, size); - } -} - -/*static*/ void AbstractDuplexChannel::readSize(AbstractDuplexChannelPtr channel) { - int toread = sizeof(uint32_t) - channel->in_buf.size(); - LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t) - << ", currently in buffer " << channel->in_buf.size() - << " channel(" << channel.get() << ")"); - - if (toread < 0) { - AbstractDuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0); - } else { - channel->readMsgSize(channel->in_buf); - } -} - -void AbstractDuplexChannel::startReceiving() { - LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this - << ") currently receiving = " << receiving); - - PubSubResponsePtr response; - bool inReadingState; - { - boost::lock_guard<boost::mutex> lock(receiving_lock); - // receiving before just return - if (receiving) { - return; - } - receiving = true; - - // if we have last response collected in previous startReceiving - // we need to process it, but we should process it under receiving_lock - // otherwise we enter dead lock - // subscriber#startDelivery(subscriber#queue_lock) => - // channel#startReceiving(channel#receiving_lock) => - // sbuscriber#messageReceived(subscriber#queue_lock) - if (outstanding_response.get()) { - response = outstanding_response; - outstanding_response = PubSubResponsePtr(); - } - - // if channel is in reading status wait data from remote server - // we don't need to insert another readSize op - inReadingState = reading; - if (!reading) { - reading = true; - } - } - - // consume message buffered in receiving queue - // there is at most one message buffered when we - // stopReceiving between #readSize and #readMsgBody - if (response.get()) { - ChannelHandlerPtr h; - { - boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock); - if (this->handler.get()) { - h = this->handler; - } - } - if (h.get()) { - h->messageReceived(shared_from_this(), response); - } - } - - // if channel is not in reading state, #readSize - if (!inReadingState) { - AbstractDuplexChannel::readSize(shared_from_this()); - } -} - -bool AbstractDuplexChannel::isReceiving() { - return receiving; -} - -bool AbstractDuplexChannel::isClosed() { - return closed; -} - -void AbstractDuplexChannel::stopReceiving() { - LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")"); - - boost::lock_guard<boost::mutex> lock(receiving_lock); - receiving = false; -} - -void AbstractDuplexChannel::startSending() { - { - boost::shared_lock<boost::shared_mutex> lock(state_lock); - if (state != CONNECTED) { - return; - } - } - - boost::lock_guard<boost::mutex> lock(sending_lock); - if (sending) { - return; - } - LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::startSending channel(" << this << ")"); - - WriteRequest w; - { - boost::lock_guard<boost::mutex> lock(write_lock); - if (write_queue.empty()) { - return; - } - w = write_queue.front(); - write_queue.pop_front(); - } - - sending = true; - - std::ostream os(&out_buf); - uint32_t size = htonl(w.first->ByteSize()); - os.write((char*)&size, sizeof(uint32_t)); - - bool err = w.first->SerializeToOstream(&os); - if (!err) { - w.second->operationFailed(ChannelWriteException()); - channelDisconnected(ChannelWriteException()); - return; - } - - writeBuffer(out_buf, w.second); -} - -const HostAddress& AbstractDuplexChannel::getHostAddress() const { - return address; -} - -void AbstractDuplexChannel::channelDisconnected(const std::exception& e) { - setState(DEAD); - - { - boost::lock_guard<boost::mutex> lock(write_lock); - while (!write_queue.empty()) { - WriteRequest w = write_queue.front(); - write_queue.pop_front(); - w.second->operationFailed(e); - } - } - - ChannelHandlerPtr h; - { - boost::shared_lock<boost::shared_mutex> lock(destruction_lock); - if (handler.get()) { - h = handler; - } - } - if (h.get()) { - h->channelDisconnected(shared_from_this(), e); - } -} - -void AbstractDuplexChannel::close() { - { - boost::shared_lock<boost::shared_mutex> statelock(state_lock); - state = DEAD; - } - - { - boost::lock_guard<boost::shared_mutex> lock(destruction_lock); - if (closed) { - // some one has closed the socket. - return; - } - closed = true; - handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/ - } - - LOG4CXX_INFO(logger, "Killing duplex channel (" << this << ")"); - - // If we are going away, fail all transactions that haven't been completed - failAllTransactions(); - closeSocket(); -} - -/*static*/ void AbstractDuplexChannel::writeCallbackHandler( - AbstractDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error, - std::size_t bytes_transferred) { - if (error) { - if (!channel->isClosed()) { - LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::writeCallbackHandler " << error << ", " - << bytes_transferred << " channel(" << channel.get() << ")"); - } - callback->operationFailed(ChannelWriteException()); - channel->channelDisconnected(ChannelWriteException()); - return; - } - - callback->operationComplete(); - - channel->out_buf.consume(bytes_transferred); - - { - boost::lock_guard<boost::mutex> lock(channel->sending_lock); - channel->sending = false; - } - - channel->startSending(); -} - -void AbstractDuplexChannel::writeRequest(const PubSubRequestPtr& m, - const OperationCallbackPtr& callback) { - { - boost::shared_lock<boost::shared_mutex> lock(state_lock); - if (state != CONNECTED && state != CONNECTING) { - LOG4CXX_ERROR(logger,"Tried to write transaction [" << m->txnid() << "] to a channel [" - << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED")); - callback->operationFailed(UninitialisedChannelException()); - return; - } - } - - { - boost::lock_guard<boost::mutex> lock(write_lock); - WriteRequest w(m, callback); - write_queue.push_back(w); - } - - startSending(); -} - -// -// Transaction operations -// - -/** - Store the transaction data for a request. -*/ -void AbstractDuplexChannel::storeTransaction(const PubSubDataPtr& data) { - LOG4CXX_DEBUG(logger, "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")"); - - boost::lock_guard<boost::mutex> lock(txnid2data_lock); - txnid2data[data->getTxnId()] = data; -} - -/** - Give the transaction back to the caller. -*/ -PubSubDataPtr AbstractDuplexChannel::retrieveTransaction(long txnid) { - boost::lock_guard<boost::mutex> lock(txnid2data_lock); - - PubSubDataPtr data = txnid2data[txnid]; - txnid2data.erase(txnid); - if (data == NULL) { - LOG4CXX_ERROR(logger, "Transaction txnid(" << txnid - << ") doesn't exist in channel (" << this << ")"); - } - - return data; -} - -void AbstractDuplexChannel::failAllTransactions() { - boost::lock_guard<boost::mutex> lock(txnid2data_lock); - for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) { - PubSubDataPtr& data = (*iter).second; - data->getCallback()->operationFailed(ChannelDiedException()); - } - txnid2data.clear(); -} - -// Set state for the channel -void AbstractDuplexChannel::setState(State s) { - boost::lock_guard<boost::shared_mutex> lock(state_lock); - state = s; -} - -// -// Basic Asio Channel Implementation -// - -AsioDuplexChannel::AsioDuplexChannel(IOServicePtr& service, - const HostAddress& addr, - const ChannelHandlerPtr& handler) - : AbstractDuplexChannel(service, addr, handler) { - this->socket = boost_socket_ptr(new boost_socket(getService())); - LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")"); -} - -AsioDuplexChannel::~AsioDuplexChannel() { -} - -void AsioDuplexChannel::doConnect(const OperationCallbackPtr& callback) { - boost::system::error_code error = boost::asio::error::host_not_found; - uint32_t ip2conn = address.ip(); - uint16_t port2conn = address.port(); - boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn); - - socket->async_connect(endp, boost::bind(&AbstractDuplexChannel::connectCallbackHandler, - shared_from_this(), callback, - boost::asio::placeholders::error)); - LOG4CXX_INFO(logger, "Channel (" << this << ") fire connect operation to ip (" - << ip2conn << ") port (" << port2conn << ")"); -} - -void AsioDuplexChannel::setSocketOption(boost::system::error_code& ec) { - boost::asio::ip::tcp::no_delay option(true); - socket->set_option(option, ec); -} - -boost::asio::ip::tcp::endpoint AsioDuplexChannel::getLocalAddress( - boost::system::error_code& ec) { - return socket->local_endpoint(ec); -} - -boost::asio::ip::tcp::endpoint AsioDuplexChannel::getRemoteAddress( - boost::system::error_code& ec) { - return socket->remote_endpoint(ec); -} - -void AsioDuplexChannel::writeBuffer(boost::asio::streambuf& buffer, - const OperationCallbackPtr& callback) { - boost::asio::async_write(*socket, buffer, - boost::bind(&AbstractDuplexChannel::writeCallbackHandler, - shared_from_this(), callback, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred) - ); -} - -void AsioDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) { - boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)), - boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -void AsioDuplexChannel::readMsgBody(boost::asio::streambuf& buffer, - int toread, uint32_t msgSize) { - boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(toread), - boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler, - shared_from_this(), msgSize, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -void AsioDuplexChannel::closeSocket() { - boost::system::error_code ec; - - socket->cancel(ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str()); - } - - socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str()); - } - - socket->close(ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str()); - } - LOG4CXX_DEBUG(logger, "Closed socket for channel " << this << "."); -} - -// SSL Context Factory - -SSLContextFactory::SSLContextFactory(const Configuration& conf) - : conf(conf), - sslPemFile(conf.get(Configuration::SSL_PEM_FILE, - DEFAULT_SSL_PEM_FILE)) { -} - -SSLContextFactory::~SSLContextFactory() {} - -boost_ssl_context_ptr SSLContextFactory::createSSLContext(boost::asio::io_service& service) { - boost_ssl_context_ptr sslCtx(new boost_ssl_context(service, - boost::asio::ssl::context::sslv23_client)); - sslCtx->set_verify_mode(boost::asio::ssl::context::verify_none); - if (!sslPemFile.empty()) { - boost::system::error_code err; - sslCtx->load_verify_file(sslPemFile, err); - - if (err) { - LOG4CXX_ERROR(logger, "Failed to load verify ssl pem file : " - << sslPemFile); - throw InvalidSSLPermFileException(); - } - } - return sslCtx; -} - -// -// SSL Channl Implementation -// - -#ifndef __APPLE__ -AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service, - const boost_ssl_context_ptr& sslCtx, - const HostAddress& addr, - const ChannelHandlerPtr& handler) - : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx), - sslclosed(false) { -#else -AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service, - const boost_ssl_context_ptr& sslCtx, - const HostAddress& addr, - const ChannelHandlerPtr& handler) - : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx) { -#endif - ssl_socket = boost_ssl_socket_ptr(new boost_ssl_socket(getService(), *ssl_ctx)); - LOG4CXX_DEBUG(logger, "Created SSL DuplexChannel(" << this << ")"); -} - -AsioSSLDuplexChannel::~AsioSSLDuplexChannel() { -} - -void AsioSSLDuplexChannel::doConnect(const OperationCallbackPtr& callback) { - boost::system::error_code error = boost::asio::error::host_not_found; - uint32_t ip2conn = address.ip(); - uint16_t port2conn = address.sslPort(); - boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn); - - ssl_socket->lowest_layer().async_connect(endp, - boost::bind(&AbstractDuplexChannel::connectCallbackHandler, - shared_from_this(), callback, - boost::asio::placeholders::error)); - LOG4CXX_INFO(logger, "SSL Channel (" << this << ") fire connect operation to ip (" - << ip2conn << ") port (" << port2conn << ")"); -} - -void AsioSSLDuplexChannel::setSocketOption(boost::system::error_code& ec) { - boost::asio::ip::tcp::no_delay option(true); - ssl_socket->lowest_layer().set_option(option, ec); -} - -boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getLocalAddress( - boost::system::error_code& ec) { - return ssl_socket->lowest_layer().local_endpoint(ec); -} - -boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getRemoteAddress( - boost::system::error_code& ec) { - return ssl_socket->lowest_layer().remote_endpoint(ec); -} - -void AsioSSLDuplexChannel::channelConnected(const OperationCallbackPtr& callback) { - // for SSL channel, we had to do SSL hand shake - startHandShake(callback); - LOG4CXX_INFO(logger, "SSL Channel " << this << " fire hand shake operation"); -} - -void AsioSSLDuplexChannel::sslChannelConnected(const OperationCallbackPtr& callback) { - LOG4CXX_INFO(logger, "SSL Channel " << this << " hand shake finish!!"); - AbstractDuplexChannel::channelConnected(callback); -} - -void AsioSSLDuplexChannel::startHandShake(const OperationCallbackPtr& callback) { - ssl_socket->async_handshake(boost::asio::ssl::stream_base::client, - boost::bind(&AsioSSLDuplexChannel::handleHandshake, - boost::dynamic_pointer_cast<AsioSSLDuplexChannel>(shared_from_this()), - callback, boost::asio::placeholders::error)); -} - -void AsioSSLDuplexChannel::handleHandshake(AsioSSLDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error) { - if (error) { - LOG4CXX_ERROR(logger, "SSL Channel " << channel.get() << " hand shake error : " - << error.message().c_str()); - channel->channelConnectFailed(ChannelConnectException(), callback); - return; - } - channel->sslChannelConnected(callback); -} - -void AsioSSLDuplexChannel::writeBuffer(boost::asio::streambuf& buffer, - const OperationCallbackPtr& callback) { - boost::asio::async_write(*ssl_socket, buffer, - boost::bind(&AbstractDuplexChannel::writeCallbackHandler, - shared_from_this(), callback, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred) - ); -} - -void AsioSSLDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) { - boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)), - boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler, - shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -void AsioSSLDuplexChannel::readMsgBody(boost::asio::streambuf& buffer, - int toread, uint32_t msgSize) { - boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(toread), - boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler, - shared_from_this(), msgSize, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); -} - -#ifndef __APPLE__ -// boost asio doesn't provide time out mechanism to shutdown ssl -void AsioSSLDuplexChannel::sslShutdown() { - ssl_socket->async_shutdown(boost::bind(&AsioSSLDuplexChannel::handleSSLShutdown, - boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()), - boost::asio::placeholders::error)); -} - -void AsioSSLDuplexChannel::handleSSLShutdown(const boost::system::error_code& error) { - if (error) { - LOG4CXX_ERROR(logger, "SSL Channel " << this << " shutdown error : " - << error.message().c_str()); - } - { - boost::lock_guard<boost::mutex> lock(sslclosed_lock); - sslclosed = true; - } - sslclosed_cond.notify_all(); -} -#endif - -void AsioSSLDuplexChannel::closeSocket() { -#ifndef __APPLE__ - // Shutdown ssl - sslShutdown(); - // time wait - { - boost::mutex::scoped_lock lock(sslclosed_lock); - if (!sslclosed) { - sslclosed_cond.timed_wait(lock, boost::posix_time::milliseconds(1000)); - } - } -#endif - closeLowestLayer(); -} - -void AsioSSLDuplexChannel::closeLowestLayer() { - boost::system::error_code ec; - - ssl_socket->lowest_layer().cancel(ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str()); - } - - ssl_socket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str()); - } - - ssl_socket->lowest_layer().close(ec); - if (ec) { - LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.h ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/channel.h b/hedwig-client/src/main/cpp/lib/channel.h deleted file mode 100644 index c9ef289..0000000 --- a/hedwig-client/src/main/cpp/lib/channel.h +++ /dev/null @@ -1,438 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef HEDWIG_CHANNEL_H -#define HEDWIG_CHANNEL_H - -#include <hedwig/protocol.h> -#include <hedwig/callback.h> -#include <hedwig/client.h> -#include "util.h" -#include "data.h" -#include "eventdispatcher.h" - -#ifdef USE_BOOST_TR1 -#include <boost/tr1/memory.hpp> -#include <boost/tr1/unordered_map.hpp> -#else -#include <tr1/memory> -#include <tr1/unordered_map> -#endif - -#include <google/protobuf/io/zero_copy_stream_impl.h> - -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> - -#include <boost/asio.hpp> -#include <boost/asio/ip/tcp.hpp> -#include <boost/asio/ssl.hpp> -#include <boost/function.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/shared_mutex.hpp> - -namespace Hedwig { - class ChannelException : public std::exception { }; - class UninitialisedChannelException : public ChannelException {}; - - class ChannelConnectException : public ChannelException {}; - class CannotCreateSocketException : public ChannelConnectException {}; - class ChannelSetupException : public ChannelConnectException {}; - class ChannelNotConnectedException : public ChannelConnectException {}; - - class ChannelDiedException : public ChannelException {}; - - class ChannelWriteException : public ChannelException {}; - class ChannelReadException : public ChannelException {}; - class ChannelThreadException : public ChannelException {}; - class ChannelOutOfMemoryException : public ChannelException {}; - - class InvalidSSLPermFileException : public std::exception {}; - - class DuplexChannel; - typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr; - typedef boost::asio::ip::tcp::socket boost_socket; - typedef boost::shared_ptr<boost_socket> boost_socket_ptr; - typedef boost::asio::ssl::stream<boost_socket> boost_ssl_socket; - typedef boost::shared_ptr<boost_ssl_socket> boost_ssl_socket_ptr; - - class ChannelHandler { - public: - virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) = 0; - virtual void channelConnected(const DuplexChannelPtr& channel) = 0; - - virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) = 0; - virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) = 0; - - virtual ~ChannelHandler() {} - }; - - typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr; - - // A channel interface to send requests - class DuplexChannel { - public: - virtual ~DuplexChannel() {} - - // Return the channel handler bound with a channel - virtual ChannelHandlerPtr getChannelHandler() = 0; - - // Issues a connect request to the target host - // User could writeRequest after issued connect request, those requests should - // be buffered and written until the channel is connected. - virtual void connect() = 0; - - // Issues a connect request to the target host - // User could writeRequest after issued connect request, those requests should - // be buffered and written until the channel is connected. - // The provided callback would be triggered after connected. - virtual void connect(const OperationCallbackPtr& callback) = 0; - - // Write the request to underlying channel - // If the channel is not established, all write requests would be buffered - // until channel is connected. - virtual void writeRequest(const PubSubRequestPtr& m, - const OperationCallbackPtr& callback) = 0; - - // Returns the remote address where this channel is connected to. - virtual const HostAddress& getHostAddress() const = 0; - - // Resumes the read operation of this channel asynchronously - virtual void startReceiving() = 0; - - // Suspends the read operation of this channel asynchronously - virtual void stopReceiving() = 0; - - // Returns if and only if the channel will read a message - virtual bool isReceiving() = 0; - - // - // Transaction operations - // - - // Store a pub/sub request - virtual void storeTransaction(const PubSubDataPtr& data) = 0; - - // Remove a pub/sub request - virtual PubSubDataPtr retrieveTransaction(long txnid) = 0; - - // Fail all transactions - virtual void failAllTransactions() = 0; - - // Handle the case that the channel is disconnected due issues found - // when reading or writing - virtual void channelDisconnected(const std::exception& e) = 0; - - // Close the channel to release the resources - // Once a channel is closed, it can not be open again. Calling this - // method on a closed channel has no efffect. - virtual void close() = 0; - }; - - typedef boost::asio::ssl::context boost_ssl_context; - typedef boost::shared_ptr<boost_ssl_context> boost_ssl_context_ptr; - - class SSLContextFactory { - public: - SSLContextFactory(const Configuration& conf); - ~SSLContextFactory(); - - boost_ssl_context_ptr createSSLContext(boost::asio::io_service& service); - private: - const Configuration& conf; - std::string sslPemFile; - }; - - typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr; - - class AbstractDuplexChannel; - typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr; - - class AbstractDuplexChannel : public DuplexChannel, - public boost::enable_shared_from_this<AbstractDuplexChannel> { - public: - AbstractDuplexChannel(IOServicePtr& service, - const HostAddress& addr, - const ChannelHandlerPtr& handler); - virtual ~AbstractDuplexChannel(); - - virtual ChannelHandlerPtr getChannelHandler(); - - // - // Connect Operation - // - - // Asio Connect Callback Handler - static void connectCallbackHandler(AbstractDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error); - virtual void connect(); - virtual void connect(const OperationCallbackPtr& callback); - - // - // Write Operation - // - - // Asio Write Callback Handler - static void writeCallbackHandler(AbstractDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error, - std::size_t bytes_transferred); - // Write request - virtual void writeRequest(const PubSubRequestPtr& m, - const OperationCallbackPtr& callback); - - // get the target host - virtual const HostAddress& getHostAddress() const; - - static void sizeReadCallbackHandler(AbstractDuplexChannelPtr channel, - const boost::system::error_code& error, - std::size_t bytes_transferred); - static void messageReadCallbackHandler(AbstractDuplexChannelPtr channel, - std::size_t messagesize, - const boost::system::error_code& error, - std::size_t bytes_transferred); - static void readSize(AbstractDuplexChannelPtr channel); - - // start receiving responses from underlying channel - virtual void startReceiving(); - // is the underlying channel in receiving state - virtual bool isReceiving(); - // stop receiving responses from underlying channel - virtual void stopReceiving(); - - // Store a pub/sub request - virtual void storeTransaction(const PubSubDataPtr& data); - - // Remove a pub/sub request - virtual PubSubDataPtr retrieveTransaction(long txnid); - - // Fail all transactions - virtual void failAllTransactions(); - - // channel is disconnected for a specified exception - virtual void channelDisconnected(const std::exception& e); - - // close the channel - virtual void close(); - - inline boost::asio::io_service & getService() const { - return service; - } - - protected: - // execute the connect operation - virtual void doConnect(const OperationCallbackPtr& callback) = 0; - - virtual void doAfterConnect(const OperationCallbackPtr& callback, - const boost::system::error_code& error); - - // Execute the action after channel connect - // It would be executed in asio connect callback handler - virtual void setSocketOption(boost::system::error_code& ec) = 0; - virtual boost::asio::ip::tcp::endpoint - getRemoteAddress(boost::system::error_code& ec) = 0; - virtual boost::asio::ip::tcp::endpoint - getLocalAddress(boost::system::error_code& ec) = 0; - - // Channel failed to connect - virtual void channelConnectFailed(const std::exception& e, - const OperationCallbackPtr& callback); - // Channel connected - virtual void channelConnected(const OperationCallbackPtr& callback); - - // Start sending buffered requests to target host - void startSending(); - - // Write a buffer to underlying socket - virtual void writeBuffer(boost::asio::streambuf& buffer, - const OperationCallbackPtr& callback) = 0; - - // Read a message from underlying socket - virtual void readMsgSize(boost::asio::streambuf& buffer) = 0; - virtual void readMsgBody(boost::asio::streambuf& buffer, - int toread, uint32_t msgSize) = 0; - - // is the channel under closing - bool isClosed(); - - // close the underlying socket to release resource - virtual void closeSocket() = 0; - - enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD }; - void setState(State s); - - // Address - HostAddress address; - private: - ChannelHandlerPtr handler; - - boost::asio::io_service &service; - - // buffers for input stream - boost::asio::streambuf in_buf; - std::istream instream; - - // only exists because protobufs can't play nice with streams - // (if there's more than message len in it, it tries to read all) - char* copy_buf; - std::size_t copy_buf_length; - - // buffers for output stream - boost::asio::streambuf out_buf; - // write requests queue - typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest; - boost::mutex write_lock; - std::deque<WriteRequest> write_queue; - - // channel state - State state; - boost::shared_mutex state_lock; - - // reading state - bool receiving; - bool reading; - PubSubResponsePtr outstanding_response; - boost::mutex receiving_lock; - - // sending state - bool sending; - boost::mutex sending_lock; - - // flag indicates the channel is closed - // some callback might return when closing - bool closed; - - // transactions - typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap; - - TransactionMap txnid2data; - boost::mutex txnid2data_lock; - boost::shared_mutex destruction_lock; - }; - - class AsioDuplexChannel : public AbstractDuplexChannel { - public: - AsioDuplexChannel(IOServicePtr& service, - const HostAddress& addr, - const ChannelHandlerPtr& handler); - virtual ~AsioDuplexChannel(); - protected: - // execute the connect operation - virtual void doConnect(const OperationCallbackPtr& callback); - - // Execute the action after channel connect - // It would be executed in asio connect callback handler - virtual void setSocketOption(boost::system::error_code& ec); - virtual boost::asio::ip::tcp::endpoint - getRemoteAddress(boost::system::error_code& ec); - virtual boost::asio::ip::tcp::endpoint - getLocalAddress(boost::system::error_code& ec); - - // Write a buffer to underlying socket - virtual void writeBuffer(boost::asio::streambuf& buffer, - const OperationCallbackPtr& callback); - - // Read a message from underlying socket - virtual void readMsgSize(boost::asio::streambuf& buffer); - virtual void readMsgBody(boost::asio::streambuf& buffer, - int toread, uint32_t msgSize); - - // close the underlying socket to release resource - virtual void closeSocket(); - private: - // underlying socket - boost_socket_ptr socket; - }; - - typedef boost::shared_ptr<AsioDuplexChannel> AsioDuplexChannelPtr; - - class AsioSSLDuplexChannel; - typedef boost::shared_ptr<AsioSSLDuplexChannel> AsioSSLDuplexChannelPtr; - - class AsioSSLDuplexChannel : public AbstractDuplexChannel { - public: - AsioSSLDuplexChannel(IOServicePtr& service, - const boost_ssl_context_ptr& sslCtx, - const HostAddress& addr, - const ChannelHandlerPtr& handler); - virtual ~AsioSSLDuplexChannel(); - protected: - // execute the connect operation - virtual void doConnect(const OperationCallbackPtr& callback); - // Execute the action after channel connect - // It would be executed in asio connect callback handler - virtual void setSocketOption(boost::system::error_code& ec); - virtual boost::asio::ip::tcp::endpoint - getRemoteAddress(boost::system::error_code& ec); - virtual boost::asio::ip::tcp::endpoint - getLocalAddress(boost::system::error_code& ec); - - virtual void channelConnected(const OperationCallbackPtr& callback); - - // Start SSL Hand Shake after the channel is connected - void startHandShake(const OperationCallbackPtr& callback); - // Asio Callback After Hand Shake - static void handleHandshake(AsioSSLDuplexChannelPtr channel, - OperationCallbackPtr callback, - const boost::system::error_code& error); - - void sslChannelConnected(const OperationCallbackPtr& callback); - - // Write a buffer to underlying socket - virtual void writeBuffer(boost::asio::streambuf& buffer, - const OperationCallbackPtr& callback); - - // Read a message from underlying socket - virtual void readMsgSize(boost::asio::streambuf& buffer); - virtual void readMsgBody(boost::asio::streambuf& buffer, - int toread, uint32_t msgSize); - - // close the underlying socket to release resource - virtual void closeSocket(); - - private: -#ifndef __APPLE__ - // Shutdown ssl - void sslShutdown(); - // Handle ssl shutdown - void handleSSLShutdown(const boost::system::error_code& error); -#endif - // Close lowest layer - void closeLowestLayer(); - - // underlying ssl socket - boost_ssl_socket_ptr ssl_socket; - // ssl context - boost_ssl_context_ptr ssl_ctx; - -#ifndef __APPLE__ - // Flag indicated ssl is closed. - bool sslclosed; - boost::mutex sslclosed_lock; - boost::condition_variable sslclosed_cond; -#endif - }; - - - struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> { - size_t operator()(const Hedwig::DuplexChannelPtr& channel) const { - return reinterpret_cast<size_t>(channel.get()); - } - }; -}; -#endif http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/client.cpp ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/cpp/lib/client.cpp b/hedwig-client/src/main/cpp/lib/client.cpp deleted file mode 100644 index e98c452..0000000 --- a/hedwig-client/src/main/cpp/lib/client.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include <hedwig/client.h> -#include <memory> - -#include "clientimpl.h" -#include <log4cxx/logger.h> - -static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); - -using namespace Hedwig; - -const std::string Configuration::DEFAULT_SERVER = "hedwig.cpp.default_server"; -const std::string Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.message_consume_retry_wait_time"; -const std::string Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.subscriber_consume_retry_wait_time"; -const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgqueue_size"; -const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time"; -const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout"; -const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume"; -const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads"; -const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound"; -const std::string Configuration::SSL_ENABLED = "hedwig.cpp.ssl_enabled"; -const std::string Configuration::SSL_PEM_FILE = "hedwig.cpp.ssl_pem"; -const std::string Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED = "hedwig.cpp.subscription_channel_sharing_enabled"; - -Client::Client(const Configuration& conf) { - LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")"); - - clientimpl = ClientImpl::Create( conf ); -} - -Subscriber& Client::getSubscriber() { - return clientimpl->getSubscriber(); -} - -Publisher& Client::getPublisher() { - return clientimpl->getPublisher(); -} - -Client::~Client() { - LOG4CXX_DEBUG(logger, "Client::~Client (" << this << ")"); - - clientimpl->Destroy(); -} - -
