Repository: qpid-proton
Updated Branches:
  refs/heads/master 6df8ad351 -> 95d04400d


re-factored select as processing of the quesced event


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/95d04400
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/95d04400
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/95d04400

Branch: refs/heads/master
Commit: 95d04400db75759a92a8618971f0b8b52070427e
Parents: 255946d
Author: Rafael Schloming <[email protected]>
Authored: Wed Jan 21 09:03:19 2015 -0500
Committer: Rafael Schloming <[email protected]>
Committed: Wed Jan 21 09:03:44 2015 -0500

----------------------------------------------------------------------
 proton-c/src/reactor/reactor.c | 116 ++++++++++++++++++------------------
 proton-c/src/tests/reactor.c   |  18 +++---
 2 files changed, 67 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/95d04400/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index a4f4b31..4f90865 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -47,7 +47,8 @@ struct pn_reactor_t {
   pn_selectable_t *selectable;
   pn_event_type_t previous;
   pn_timestamp_t now;
-  bool selected;
+  int selectables;
+  int timeout;
 };
 
 static void pn_reactor_mark(pn_reactor_t *reactor) {
@@ -55,24 +56,18 @@ static void pn_reactor_mark(pn_reactor_t *reactor) {
   reactor->now = pn_i_now();
 }
 
-static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) {
-  /*pn_string_t *str = pn_string(NULL);
-  pn_inspect(event, str);
-  printf("%s\n", pn_string_get(str));
-  pn_free(str);*/
-}
-
 static void pn_reactor_initialize(pn_reactor_t *reactor) {
   reactor->attachments = pn_record();
   reactor->io = pn_io();
   reactor->selector = pn_io_selector(reactor->io);
   reactor->collector = pn_collector();
-  reactor->handler = pn_handler(pn_dummy_dispatch);
+  reactor->handler = pn_handler(NULL);
   reactor->children = pn_list(PN_OBJECT, 0);
   reactor->timer = pn_timer(reactor->collector);
   reactor->selectable = NULL;
   reactor->previous = PN_EVENT_NONE;
-  reactor->selected = false;
+  reactor->selectables = 0;
+  reactor->timeout = 0;
   pn_reactor_mark(reactor);
 }
 
@@ -115,11 +110,6 @@ pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) {
   return reactor->handler;
 }
 
-pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor) {
-  assert(reactor);
-  return reactor->selector;
-}
-
 pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
   assert(reactor);
   return reactor->io;
@@ -139,6 +129,7 @@ static void pni_selectable_release(pn_selectable_t 
*selectable) {
   pn_reactor_t *reactor = (pn_reactor_t *) 
pni_selectable_get_context(selectable);
   pn_collector_put(reactor->collector, PN_OBJECT, selectable, 
PN_SELECTABLE_FINAL);
   pn_list_remove(reactor->children, selectable);
+  reactor->selectables--;
 }
 
 pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) {
@@ -150,6 +141,7 @@ pn_selectable_t *pn_reactor_selectable(pn_reactor_t 
*reactor) {
   pn_list_add(reactor->children, sel);
   pn_selectable_on_release(sel, pni_selectable_release);
   pn_decref(sel);
+  reactor->selectables++;
   return sel;
 }
 
@@ -158,6 +150,24 @@ void pn_reactor_update(pn_reactor_t *reactor, 
pn_selectable_t *selectable) {
   pn_collector_put(reactor->collector, PN_OBJECT, selectable, 
PN_SELECTABLE_UPDATED);
 }
 
+void pni_handle_quiesced(pn_reactor_t *reactor) {
+  pn_selector_select(reactor->selector, reactor->timeout);
+  pn_selectable_t *sel;
+  int events;
+  pn_reactor_mark(reactor);
+  while ((sel = pn_selector_next(reactor->selector, &events))) {
+    if (events & PN_READABLE) {
+      pn_selectable_readable(sel);
+    }
+    if (events & PN_WRITABLE) {
+      pn_selectable_writable(sel);
+    }
+    if (events & PN_EXPIRED) {
+      pn_selectable_expired(sel);
+    }
+  }
+}
+
 void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event);
 void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event);
 void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event);
@@ -204,6 +214,9 @@ static void pni_reactor_dispatch_post(pn_reactor_t 
*reactor, pn_event_t *event)
   case PN_CONNECTION_FINAL:
     pni_handle_final(reactor, event);
     break;
+  case PN_REACTOR_QUIESCED:
+    pni_handle_quiesced(reactor);
+    break;
   default:
     break;
   }
@@ -319,11 +332,24 @@ pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int 
delay, pn_handler_t *h
   return task;
 }
 
-void pn_reactor_process(pn_reactor_t *reactor) {
+void pni_event_print(pn_event_t *event) {
+  pn_string_t *str = pn_string(NULL);
+  pn_inspect(event, str);
+  printf("%s\n", pn_string_get(str));
+  pn_free(str);
+}
+
+bool pni_reactor_more(pn_reactor_t *reactor) {
+  assert(reactor);
+  return pn_timer_tasks(reactor->timer) || reactor->selectables > 1;
+}
+
+bool pn_reactor_process(pn_reactor_t *reactor) {
   assert(reactor);
   pn_reactor_mark(reactor);
   while (true) {
     pn_event_t *event = pn_collector_peek(reactor->collector);
+    //    pni_event_print(event);
     if (event) {
       pni_reactor_dispatch_pre(reactor, event);
       pn_handler_t *handler = pn_event_handler(event, reactor->handler);
@@ -331,10 +357,22 @@ void pn_reactor_process(pn_reactor_t *reactor) {
       pni_reactor_dispatch_post(reactor, event);
       reactor->previous = pn_event_type(event);
       pn_collector_pop(reactor->collector);
-    } else if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous 
!= PN_REACTOR_FINAL) {
-      pn_collector_put(reactor->collector, PN_OBJECT, reactor, 
PN_REACTOR_QUIESCED);
     } else {
-      break;
+      if (pni_reactor_more(reactor)) {
+        if (reactor->previous != PN_REACTOR_QUIESCED && reactor->previous != 
PN_REACTOR_FINAL) {
+          pn_collector_put(reactor->collector, PN_OBJECT, reactor, 
PN_REACTOR_QUIESCED);
+        } else {
+          return true;
+        }
+      } else {
+        if (reactor->selectable) {
+          pn_selectable_terminate(reactor->selectable);
+          pn_reactor_update(reactor, reactor->selectable);
+          reactor->selectable = NULL;
+        } else {
+          return false;
+        }
+      }
     }
   }
 }
@@ -360,44 +398,8 @@ void pn_reactor_start(pn_reactor_t *reactor) {
 
 bool pn_reactor_work(pn_reactor_t *reactor, int timeout) {
   assert(reactor);
-  pn_reactor_process(reactor);
-
-  if (pn_selector_size(reactor->selector) == 1) {
-    if (reactor->selected) {
-      if (!pn_timer_tasks(reactor->timer) && reactor->selectable) {
-        pn_selectable_terminate(reactor->selectable);
-        pn_reactor_update(reactor, reactor->selectable);
-        reactor->selectable = NULL;
-        return true;
-      }
-    } else {
-      timeout = 0;
-    }
-  }
-
-  if (!pn_selector_size(reactor->selector)) {
-    return false;
-  }
-
-  pn_selector_select(reactor->selector, timeout);
-  pn_selectable_t *sel;
-  int events;
-  pn_reactor_mark(reactor);
-  while ((sel = pn_selector_next(reactor->selector, &events))) {
-    if (events & PN_READABLE) {
-      pn_selectable_readable(sel);
-    }
-    if (events & PN_WRITABLE) {
-      pn_selectable_writable(sel);
-    }
-    if (events & PN_EXPIRED) {
-      pn_selectable_expired(sel);
-    }
-  }
-
-  reactor->selected = true;
-
-  return true;
+  reactor->timeout = timeout;
+  return pn_reactor_process(reactor);
 }
 
 void pn_reactor_stop(pn_reactor_t *reactor) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/95d04400/proton-c/src/tests/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
index 1365f88..ff83fa7 100644
--- a/proton-c/src/tests/reactor.c
+++ b/proton-c/src/tests/reactor.c
@@ -137,8 +137,8 @@ static void test_reactor_handler_run(void) {
   pn_handler_t *th = test_handler(reactor, events);
   pn_handler_add(handler, th);
   pn_reactor_run(reactor);
-  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_REACTOR_QUIESCED,
-         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, 
PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   pn_free(reactor);
   pn_free(th);
   pn_free(events);
@@ -152,8 +152,8 @@ static void test_reactor_handler_run_free(void) {
   pn_list_t *events = pn_list(PN_VOID, 0);
   pn_handler_add(handler, test_handler(reactor, events));
   pn_reactor_run(reactor);
-  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_REACTOR_QUIESCED,
-         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, 
PN_REACTOR_FINAL, END);
+  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   pn_reactor_free(reactor);
   pn_free(events);
 }
@@ -169,8 +169,8 @@ static void test_reactor_connection(void) {
   pn_list_t *revents = pn_list(PN_VOID, 0);
   pn_handler_add(root, test_handler(reactor, revents));
   pn_reactor_run(reactor);
-  expect(revents, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_REACTOR_QUIESCED,
-         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED, 
PN_REACTOR_FINAL, END);
+  expect(revents, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_SELECTABLE_UPDATED,
+         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   expect(cevents, PN_CONNECTION_INIT, END);
   pn_reactor_free(reactor);
   pn_handler_free(tch);
@@ -420,8 +420,7 @@ static void test_reactor_schedule(void) {
   pn_reactor_run(reactor);
   pn_reactor_free(reactor);
   expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_REACTOR_QUIESCED,
-         PN_TIMER_TASK, PN_REACTOR_QUIESCED, PN_SELECTABLE_UPDATED, 
PN_SELECTABLE_FINAL, PN_REACTOR_QUIESCED,
-         PN_REACTOR_FINAL, END);
+         PN_TIMER_TASK, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, 
PN_REACTOR_FINAL, END);
   pn_free(events);
 }
 
@@ -437,8 +436,7 @@ static void test_reactor_schedule_handler(void) {
   pn_reactor_free(reactor);
   pn_handler_free(th);
   expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, 
PN_REACTOR_QUIESCED,
-         PN_REACTOR_QUIESCED, PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, 
PN_REACTOR_QUIESCED,
-         PN_REACTOR_FINAL, END);
+         PN_SELECTABLE_UPDATED, PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
   expect(tevents, PN_TIMER_TASK, END);
   pn_free(events);
   pn_free(tevents);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to