This is an automated email from the ASF dual-hosted git repository.
oknet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 9daeb81 Optimize: tighten the logic of the
PluginVC::process_read/write_side()
9daeb81 is described below
commit 9daeb81ccc46b62ba340898319a0cadd96b2326c
Author: Oknet Xu <[email protected]>
AuthorDate: Fri Dec 28 18:07:11 2018 +0800
Optimize: tighten the logic of the PluginVC::process_read/write_side()
According to the comment on the head of `process_read/write_side`:
- To call the `process_read_side()`, the mutexes must be obtained:
- PluginVC::mutex
- PluginVC::read_state.vio.mutex
- To call the `process_write_side()`, the mutexes must be obtained:
- PluginVC::mutex
- PluginVC::write_state.vio.mutex
But it violates the rules when `process_read/write_side()` is called
with `other_side_call == true`.
The commit makes the code to be compatible with the rules when
`other_side_call` is true.
This is an update to TS-3339 (0f9dda6).
---
proxy/PluginVC.cc | 87 ++++++++++++++++++++++++++++++++++---------------------
1 file changed, 54 insertions(+), 33 deletions(-)
diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index 85dd7fd..44d4076 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -310,6 +310,7 @@ PluginVC::reenable(VIO *vio)
{
ink_assert(!closed);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
+ ink_assert(vio->mutex->thread_holding == this_ethread());
Ptr<ProxyMutex> sm_mutex = vio->mutex;
SCOPED_MUTEX_LOCK(lock, sm_mutex, this_ethread());
@@ -332,6 +333,7 @@ PluginVC::reenable_re(VIO *vio)
{
ink_assert(!closed);
ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
+ ink_assert(vio->mutex->thread_holding == this_ethread());
Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op ==
VIO::WRITE) ? "Write" : "Read");
@@ -473,25 +475,13 @@ PluginVC::process_write_side(bool other_side_call)
MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ?
core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
+ Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
need_write_process = false;
+ // Check write_state
if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
return;
}
- // Acquire the lock of the write side continuation
- EThread *my_ethread = mutex->thread_holding;
- ink_assert(my_ethread != nullptr);
- MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
- if (!lock.is_locked()) {
- Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying",
core_obj->id, PVC_TYPE);
-
- need_write_process = true;
- setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
- return;
- }
-
- Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
- need_write_process = false;
// Check the state of our write buffer as well as ntodo
int64_t ntodo = write_state.vio.ntodo();
@@ -552,6 +542,30 @@ PluginVC::process_write_side(bool other_side_call)
// Wake up the read side on the other side to process these bytes
if (!other_side->closed) {
if (!other_side_call) {
+ /* To clear the `need_read_process`, the mutexes must be obtained:
+ *
+ * - PluginVC::mutex
+ * - PluginVC::read_state.vio.mutex
+ *
+ */
+ if (other_side->read_state.vio.op != VIO::READ || other_side->closed ||
other_side->read_state.shutdown) {
+ // Just return, no touch on `other_side->need_read_process`.
+ return;
+ }
+ // Acquire the lock of the read side continuation
+ EThread *my_ethread = mutex->thread_holding;
+ ink_assert(my_ethread != nullptr);
+ MUTEX_TRY_LOCK(lock, other_side->read_state.vio.mutex, my_ethread);
+ if (!lock.is_locked()) {
+ Debug("pvc_event", "[%u] %s: process_read_side from other side lock
miss, retrying", other_side->core_obj->id,
+ ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" :
"Passive"));
+
+ // set need_read_process to enforce the read processing
+ other_side->need_read_process = true;
+ other_side->setup_event_cb(PVC_LOCK_RETRY_TIME,
&other_side->core_lock_retry_event);
+ return;
+ }
+
other_side->process_read_side(true);
} else {
other_side->read_state.vio.reenable();
@@ -587,28 +601,11 @@ PluginVC::process_read_side(bool other_side_call)
core_reader = core_obj->a_to_p_reader;
}
- need_read_process = false;
-
- if (read_state.vio.op != VIO::READ || closed) {
- return;
- }
- // Acquire the lock of the read side continuation
- EThread *my_ethread = mutex->thread_holding;
- ink_assert(my_ethread != nullptr);
- MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
- if (!lock.is_locked()) {
- Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying",
core_obj->id, PVC_TYPE);
-
- need_read_process = true;
- setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
- return;
- }
-
Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
need_read_process = false;
- // Check read_state.shutdown after the lock has been obtained.
- if (read_state.shutdown) {
+ // Check read_state
+ if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
return;
}
@@ -668,6 +665,30 @@ PluginVC::process_read_side(bool other_side_call)
// intermediate buffer
if (!other_side->closed) {
if (!other_side_call) {
+ /* To clear the `need_write_process`, the mutexes must be obtained:
+ *
+ * - PluginVC::mutex
+ * - PluginVC::write_state.vio.mutex
+ *
+ */
+ if (other_side->write_state.vio.op != VIO::WRITE || other_side->closed
|| other_side->write_state.shutdown) {
+ // Just return, no touch on `other_side->need_write_process`.
+ return;
+ }
+ // Acquire the lock of the write side continuation
+ EThread *my_ethread = mutex->thread_holding;
+ ink_assert(my_ethread != nullptr);
+ MUTEX_TRY_LOCK(lock, other_side->write_state.vio.mutex, my_ethread);
+ if (!lock.is_locked()) {
+ Debug("pvc_event", "[%u] %s: process_write_side from other side lock
miss, retrying", other_side->core_obj->id,
+ ((other_side->vc_type == PLUGIN_VC_ACTIVE) ? "Active" :
"Passive"));
+
+ // set need_write_process to enforce the write processing
+ other_side->need_write_process = true;
+ other_side->setup_event_cb(PVC_LOCK_RETRY_TIME,
&other_side->core_lock_retry_event);
+ return;
+ }
+
other_side->process_write_side(true);
} else {
other_side->write_state.vio.reenable();