Repository: trafficserver
Updated Branches:
  refs/heads/master 5d38019f2 -> 006ede8cf


Patch for TS-2652: Allowing cancelation of AsyncProviders


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/af12e778
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/af12e778
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/af12e778

Branch: refs/heads/master
Commit: af12e778714f6950316d2752339be178df43d7f1
Parents: 79dd503
Author: Manjesh Nilange <[email protected]>
Authored: Thu Mar 20 15:22:52 2014 -0700
Committer: Manjesh Nilange <[email protected]>
Committed: Thu Mar 20 15:22:52 2014 -0700

----------------------------------------------------------------------
 .../examples/async_http_fetch/AsyncHttpFetch.cc | 42 +++++++++++++++++-
 lib/atscppapi/src/AsyncHttpFetch.cc             |  4 +-
 lib/atscppapi/src/AsyncTimer.cc                 |  4 +-
 lib/atscppapi/src/include/atscppapi/Async.h     | 46 +++++++++++++++++---
 .../src/include/atscppapi/AsyncHttpFetch.h      |  2 +-
 .../src/include/atscppapi/AsyncTimer.h          |  2 +-
 6 files changed, 87 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc 
b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
index 0535786..14ff9cc 100644
--- a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
+++ b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc
@@ -22,6 +22,7 @@
 #include <atscppapi/Logger.h>
 #include <atscppapi/Async.h>
 #include <atscppapi/AsyncHttpFetch.h>
+#include <atscppapi/AsyncTimer.h>
 #include <atscppapi/PluginInit.h>
 #include <cstring>
 #include <cassert>
@@ -43,8 +44,31 @@ public:
   AsyncHttpFetch3(string request, HttpMethod method) : AsyncHttpFetch(request, 
method) { };
 };
 
+class DelayedAsyncHttpFetch : public AsyncHttpFetch, public 
AsyncReceiver<AsyncTimer> {
+public:
+  DelayedAsyncHttpFetch(string request, HttpMethod method, shared_ptr<Mutex> 
mutex)
+    : AsyncHttpFetch(request, method), mutex_(mutex), timer_(NULL) { };
+  void run() {
+    timer_ = new AsyncTimer(AsyncTimer::TYPE_ONE_OFF, 1000 /* 1s */);
+    Async::execute(this, timer_, mutex_);
+  }
+  void handleAsyncComplete(AsyncTimer &/*timer ATS_UNUSED */) {
+    TS_DEBUG(TAG, "Receiver should not be reachable");
+    assert(!getDispatchController()->dispatch());
+    delete this;
+  }
+  bool isAlive() {
+    return getDispatchController()->isEnabled();
+  }
+  ~DelayedAsyncHttpFetch() { delete timer_; }
+private:
+  shared_ptr<Mutex> mutex_;
+  AsyncTimer *timer_;
+};
+
 class TransactionHookPlugin : public TransactionPlugin, public 
AsyncReceiver<AsyncHttpFetch>,
-                              public AsyncReceiver<AsyncHttpFetch2>, public 
AsyncReceiver<AsyncHttpFetch3> {
+                              public AsyncReceiver<AsyncHttpFetch2>, public 
AsyncReceiver<AsyncHttpFetch3>,
+                              public AsyncReceiver<DelayedAsyncHttpFetch> {
 public:
   TransactionHookPlugin(Transaction &transaction) :
     TransactionPlugin(transaction), transaction_(transaction), 
num_fetches_pending_(0) {
@@ -70,6 +94,15 @@ public:
     request_headers.set("Header2", "Value2");
     Async::execute<AsyncHttpFetch2>(this, provider2, getMutex());
     ++num_fetches_pending_;
+
+    DelayedAsyncHttpFetch *delayed_provider = new DelayedAsyncHttpFetch("url", 
HTTP_METHOD_GET, getMutex());
+    Async::execute<DelayedAsyncHttpFetch>(this, delayed_provider, getMutex());
+
+    // canceling right after starting in this case, but cancel() can be called 
any time
+    TS_DEBUG(TAG, "Will cancel delayed fetch");
+    assert(delayed_provider->isAlive());
+    delayed_provider->cancel();
+    assert(!delayed_provider->isAlive());
   }
 
   void handleAsyncComplete(AsyncHttpFetch &async_http_fetch) {
@@ -94,6 +127,10 @@ public:
     assert(!"AsyncHttpFetch3 shouldn't have completed!");
   }
 
+  void handleAsyncComplete(DelayedAsyncHttpFetch &/*async_http_fetch 
ATS_UNUSED */) {
+    assert(!"Should've been canceled!");
+  }
+
 private:
   Transaction &transaction_;
   int num_fetches_pending_;
@@ -139,6 +176,9 @@ public:
     if (!transaction.isInternalRequest()) {
       transaction.addPlugin(new TransactionHookPlugin(transaction));
     }
+    else {
+      TS_DEBUG(TAG, "Ignoring internal transaction");
+    }
     transaction.resume();
   }
 };

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/AsyncHttpFetch.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/AsyncHttpFetch.cc 
b/lib/atscppapi/src/AsyncHttpFetch.cc
index 19dbe58..ff374a9 100644
--- a/lib/atscppapi/src/AsyncHttpFetch.cc
+++ b/lib/atscppapi/src/AsyncHttpFetch.cc
@@ -118,8 +118,8 @@ void AsyncHttpFetch::init(const string &url_str, HttpMethod 
http_method, const s
   state_ = new AsyncHttpFetchState(url_str, http_method, request_body);
 }
 
-void AsyncHttpFetch::run(shared_ptr<AsyncDispatchControllerBase> sender) {
-  state_->dispatch_controller_ = sender;
+void AsyncHttpFetch::run() {
+  state_->dispatch_controller_ = getDispatchController(); // keep a copy in 
state so that cont handler can use it
 
   TSCont fetchCont = TSContCreate(handleFetchEvents, TSMutexCreate());
   TSContDataSet(fetchCont, static_cast<void *>(this)); // Providers have to 
clean themselves up when they are done.

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/AsyncTimer.cc
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/AsyncTimer.cc b/lib/atscppapi/src/AsyncTimer.cc
index 995cc48..c7548ce 100644
--- a/lib/atscppapi/src/AsyncTimer.cc
+++ b/lib/atscppapi/src/AsyncTimer.cc
@@ -67,7 +67,8 @@ AsyncTimer::AsyncTimer(Type type, int period_in_ms, int 
initial_period_in_ms) {
   TSContDataSet(state_->cont_, static_cast<void *>(state_));
 }
 
-void AsyncTimer::run(shared_ptr<AsyncDispatchControllerBase> 
dispatch_controller) {
+void AsyncTimer::run() {
+  state_->dispatch_controller_ = getDispatchController(); // keep a copy in 
state so that cont handler can use it
   int one_off_timeout_in_ms = 0;
   int regular_timeout_in_ms = 0;
   if (state_->type_ == AsyncTimer::TYPE_ONE_OFF) {
@@ -87,7 +88,6 @@ void AsyncTimer::run(shared_ptr<AsyncDispatchControllerBase> 
dispatch_controller
     state_->periodic_timer_action_ = TSContScheduleEvery(state_->cont_, 
regular_timeout_in_ms,
                                                          
TS_THREAD_POOL_DEFAULT);
   }
-  state_->dispatch_controller_ = dispatch_controller;
 }
 
 AsyncTimer::~AsyncTimer() {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/Async.h
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/include/atscppapi/Async.h 
b/lib/atscppapi/src/include/atscppapi/Async.h
index 39a24d0..3bfa2f4 100644
--- a/lib/atscppapi/src/include/atscppapi/Async.h
+++ b/lib/atscppapi/src/include/atscppapi/Async.h
@@ -46,6 +46,13 @@ public:
    * @return True if the receiver was still alive.
    */
   virtual bool dispatch() = 0;
+
+  /** Renders dispatch unusable to communicate to receiver */
+  virtual void disable() = 0;
+
+  /** Returns true if receiver can be communicated with */
+  virtual bool isEnabled() = 0;
+
   virtual ~AsyncDispatchControllerBase() { }
 };
 
@@ -60,13 +67,31 @@ class AsyncProvider {
 public:
   /**
    * This method is invoked when the async operation is requested. This call 
should be used
-   * to just start the async operation and *not* block this thread.
-   *
-   * @param dispatch_controller provides a way to dispatch an "async complete" 
event to the
-   *                            requester.
+   * to just start the async operation and *not* block this thread. On 
completion, 
+   * getDispatchController() can be used to invoke the receiver.
    */
-  virtual void run(shared_ptr<AsyncDispatchControllerBase> 
dispatch_controller) = 0;
+  virtual void run() = 0;
+
+  /** Base implementation just breaks communication channel with receiver. 
Implementations
+   * should add business logic here. */
+  virtual void cancel() {
+    if (dispatch_controller_) {
+      dispatch_controller_->disable();
+    }
+  }
+
   virtual ~AsyncProvider() { }
+
+protected:
+  shared_ptr<AsyncDispatchControllerBase> getDispatchController() { return 
dispatch_controller_; }
+
+private:
+  shared_ptr<AsyncDispatchControllerBase> dispatch_controller_;
+  void doRun(shared_ptr<AsyncDispatchControllerBase> dispatch_controller) {
+    dispatch_controller_ = dispatch_controller;
+    run();
+  }
+  friend class Async;
 };
 
 /**
@@ -88,6 +113,15 @@ public:
     return ret;
   }
 
+  void disable() {
+    ScopedSharedMutexLock scopedLock(dispatch_mutex_);
+    event_receiver_ = NULL;
+  }
+
+  bool isEnabled() {
+    return (event_receiver_ != NULL);
+  }
+
   /**
    * Constructor
    *
@@ -177,7 +211,7 @@ public:
     shared_ptr<AsyncReceiverPromise<AsyncReceiver<AsyncProviderType>, 
AsyncProviderType > > receiver_promise(
       new AsyncReceiverPromise<AsyncReceiver<AsyncProviderType>, 
AsyncProviderType >(dispatcher));
     event_receiver->receiver_promises_.push_back(receiver_promise); // now if 
the event receiver dies, we're safe.
-    provider->run(dispatcher);
+    provider->doRun(dispatcher);
   }
 };
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h 
b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
index 42118fa..1ddd235 100644
--- a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
+++ b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h
@@ -90,7 +90,7 @@ public:
   /**
    * Starts a HTTP fetch of the Request contained.
    */  
-  virtual void run(shared_ptr<AsyncDispatchControllerBase> 
dispatch_controller);
+  virtual void run();
 protected:
   virtual ~AsyncHttpFetch();
 

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/af12e778/lib/atscppapi/src/include/atscppapi/AsyncTimer.h
----------------------------------------------------------------------
diff --git a/lib/atscppapi/src/include/atscppapi/AsyncTimer.h 
b/lib/atscppapi/src/include/atscppapi/AsyncTimer.h
index 10da3e9..5e61e1d 100644
--- a/lib/atscppapi/src/include/atscppapi/AsyncTimer.h
+++ b/lib/atscppapi/src/include/atscppapi/AsyncTimer.h
@@ -67,7 +67,7 @@ public:
   /**
    * Starts the timer.
    */  
-  void run(shared_ptr<AsyncDispatchControllerBase> dispatch_controller);
+  void run();
 
 private:
   AsyncTimerState *state_;

Reply via email to