Repository: trafficserver Updated Branches: refs/heads/master 5d38019f2 -> 006ede8cf
Patch for TS-2652: Allowing cancelation of AsyncProviders Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/af12e778 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/af12e778 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/af12e778 Branch: refs/heads/master Commit: af12e778714f6950316d2752339be178df43d7f1 Parents: 79dd503 Author: Manjesh Nilange <[email protected]> Authored: Thu Mar 20 15:22:52 2014 -0700 Committer: Manjesh Nilange <[email protected]> Committed: Thu Mar 20 15:22:52 2014 -0700 ---------------------------------------------------------------------- .../examples/async_http_fetch/AsyncHttpFetch.cc | 42 +++++++++++++++++- lib/atscppapi/src/AsyncHttpFetch.cc | 4 +- lib/atscppapi/src/AsyncTimer.cc | 4 +- lib/atscppapi/src/include/atscppapi/Async.h | 46 +++++++++++++++++--- .../src/include/atscppapi/AsyncHttpFetch.h | 2 +- .../src/include/atscppapi/AsyncTimer.h | 2 +- 6 files changed, 87 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc ---------------------------------------------------------------------- diff --git a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc index 0535786..14ff9cc 100644 --- a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc +++ b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc @@ -22,6 +22,7 @@ #include <atscppapi/Logger.h> #include <atscppapi/Async.h> #include <atscppapi/AsyncHttpFetch.h> +#include <atscppapi/AsyncTimer.h> #include <atscppapi/PluginInit.h> #include <cstring> #include <cassert> @@ -43,8 +44,31 @@ public: AsyncHttpFetch3(string request, HttpMethod method) : AsyncHttpFetch(request, method) { }; }; +class DelayedAsyncHttpFetch : public AsyncHttpFetch, public AsyncReceiver<AsyncTimer> { +public: + DelayedAsyncHttpFetch(string request, HttpMethod method, shared_ptr<Mutex> mutex) + : AsyncHttpFetch(request, method), mutex_(mutex), timer_(NULL) { }; + void run() { + timer_ = new AsyncTimer(AsyncTimer::TYPE_ONE_OFF, 1000 /* 1s */); + Async::execute(this, timer_, mutex_); + } + void handleAsyncComplete(AsyncTimer &/*timer ATS_UNUSED */) { + TS_DEBUG(TAG, "Receiver should not be reachable"); + assert(!getDispatchController()->dispatch()); + delete this; + } + bool isAlive() { + return getDispatchController()->isEnabled(); + } + ~DelayedAsyncHttpFetch() { delete timer_; } +private: + shared_ptr<Mutex> mutex_; + AsyncTimer *timer_; +}; + class TransactionHookPlugin : public TransactionPlugin, public AsyncReceiver<AsyncHttpFetch>, - public AsyncReceiver<AsyncHttpFetch2>, public AsyncReceiver<AsyncHttpFetch3> { + public AsyncReceiver<AsyncHttpFetch2>, public AsyncReceiver<AsyncHttpFetch3>, + public AsyncReceiver<DelayedAsyncHttpFetch> { public: TransactionHookPlugin(Transaction &transaction) : TransactionPlugin(transaction), transaction_(transaction), num_fetches_pending_(0) { @@ -70,6 +94,15 @@ public: request_headers.set("Header2", "Value2"); Async::execute<AsyncHttpFetch2>(this, provider2, getMutex()); ++num_fetches_pending_; + + DelayedAsyncHttpFetch *delayed_provider = new DelayedAsyncHttpFetch("url", HTTP_METHOD_GET, getMutex()); + Async::execute<DelayedAsyncHttpFetch>(this, delayed_provider, getMutex()); + + // canceling right after starting in this case, but cancel() can be called any time + TS_DEBUG(TAG, "Will cancel delayed fetch"); + assert(delayed_provider->isAlive()); + delayed_provider->cancel(); + assert(!delayed_provider->isAlive()); } void handleAsyncComplete(AsyncHttpFetch &async_http_fetch) { @@ -94,6 +127,10 @@ public: assert(!"AsyncHttpFetch3 shouldn't have completed!"); } + void handleAsyncComplete(DelayedAsyncHttpFetch &/*async_http_fetch ATS_UNUSED */) { + assert(!"Should've been canceled!"); + } + private: Transaction &transaction_; int num_fetches_pending_; @@ -139,6 +176,9 @@ public: if (!transaction.isInternalRequest()) { transaction.addPlugin(new TransactionHookPlugin(transaction)); } + else { + TS_DEBUG(TAG, "Ignoring internal transaction"); + } transaction.resume(); } }; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/AsyncHttpFetch.cc ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/AsyncHttpFetch.cc b/lib/atscppapi/src/AsyncHttpFetch.cc index 19dbe58..ff374a9 100644 --- a/lib/atscppapi/src/AsyncHttpFetch.cc +++ b/lib/atscppapi/src/AsyncHttpFetch.cc @@ -118,8 +118,8 @@ void AsyncHttpFetch::init(const string &url_str, HttpMethod http_method, const s state_ = new AsyncHttpFetchState(url_str, http_method, request_body); } -void AsyncHttpFetch::run(shared_ptr<AsyncDispatchControllerBase> sender) { - state_->dispatch_controller_ = sender; +void AsyncHttpFetch::run() { + state_->dispatch_controller_ = getDispatchController(); // keep a copy in state so that cont handler can use it TSCont fetchCont = TSContCreate(handleFetchEvents, TSMutexCreate()); TSContDataSet(fetchCont, static_cast<void *>(this)); // Providers have to clean themselves up when they are done. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/AsyncTimer.cc ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/AsyncTimer.cc b/lib/atscppapi/src/AsyncTimer.cc index 995cc48..c7548ce 100644 --- a/lib/atscppapi/src/AsyncTimer.cc +++ b/lib/atscppapi/src/AsyncTimer.cc @@ -67,7 +67,8 @@ AsyncTimer::AsyncTimer(Type type, int period_in_ms, int initial_period_in_ms) { TSContDataSet(state_->cont_, static_cast<void *>(state_)); } -void AsyncTimer::run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller) { +void AsyncTimer::run() { + state_->dispatch_controller_ = getDispatchController(); // keep a copy in state so that cont handler can use it int one_off_timeout_in_ms = 0; int regular_timeout_in_ms = 0; if (state_->type_ == AsyncTimer::TYPE_ONE_OFF) { @@ -87,7 +88,6 @@ void AsyncTimer::run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller state_->periodic_timer_action_ = TSContScheduleEvery(state_->cont_, regular_timeout_in_ms, TS_THREAD_POOL_DEFAULT); } - state_->dispatch_controller_ = dispatch_controller; } AsyncTimer::~AsyncTimer() { http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/Async.h ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/include/atscppapi/Async.h b/lib/atscppapi/src/include/atscppapi/Async.h index 39a24d0..3bfa2f4 100644 --- a/lib/atscppapi/src/include/atscppapi/Async.h +++ b/lib/atscppapi/src/include/atscppapi/Async.h @@ -46,6 +46,13 @@ public: * @return True if the receiver was still alive. */ virtual bool dispatch() = 0; + + /** Renders dispatch unusable to communicate to receiver */ + virtual void disable() = 0; + + /** Returns true if receiver can be communicated with */ + virtual bool isEnabled() = 0; + virtual ~AsyncDispatchControllerBase() { } }; @@ -60,13 +67,31 @@ class AsyncProvider { public: /** * This method is invoked when the async operation is requested. This call should be used - * to just start the async operation and *not* block this thread. - * - * @param dispatch_controller provides a way to dispatch an "async complete" event to the - * requester. + * to just start the async operation and *not* block this thread. On completion, + * getDispatchController() can be used to invoke the receiver. */ - virtual void run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller) = 0; + virtual void run() = 0; + + /** Base implementation just breaks communication channel with receiver. Implementations + * should add business logic here. */ + virtual void cancel() { + if (dispatch_controller_) { + dispatch_controller_->disable(); + } + } + virtual ~AsyncProvider() { } + +protected: + shared_ptr<AsyncDispatchControllerBase> getDispatchController() { return dispatch_controller_; } + +private: + shared_ptr<AsyncDispatchControllerBase> dispatch_controller_; + void doRun(shared_ptr<AsyncDispatchControllerBase> dispatch_controller) { + dispatch_controller_ = dispatch_controller; + run(); + } + friend class Async; }; /** @@ -88,6 +113,15 @@ public: return ret; } + void disable() { + ScopedSharedMutexLock scopedLock(dispatch_mutex_); + event_receiver_ = NULL; + } + + bool isEnabled() { + return (event_receiver_ != NULL); + } + /** * Constructor * @@ -177,7 +211,7 @@ public: shared_ptr<AsyncReceiverPromise<AsyncReceiver<AsyncProviderType>, AsyncProviderType > > receiver_promise( new AsyncReceiverPromise<AsyncReceiver<AsyncProviderType>, AsyncProviderType >(dispatcher)); event_receiver->receiver_promises_.push_back(receiver_promise); // now if the event receiver dies, we're safe. - provider->run(dispatcher); + provider->doRun(dispatcher); } }; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h index 42118fa..1ddd235 100644 --- a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h +++ b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h @@ -90,7 +90,7 @@ public: /** * Starts a HTTP fetch of the Request contained. */ - virtual void run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller); + virtual void run(); protected: virtual ~AsyncHttpFetch(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/AsyncTimer.h ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/include/atscppapi/AsyncTimer.h b/lib/atscppapi/src/include/atscppapi/AsyncTimer.h index 10da3e9..5e61e1d 100644 --- a/lib/atscppapi/src/include/atscppapi/AsyncTimer.h +++ b/lib/atscppapi/src/include/atscppapi/AsyncTimer.h @@ -67,7 +67,7 @@ public: /** * Starts the timer. */ - void run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller); + void run(); private: AsyncTimerState *state_;
