This is an automated email from the ASF dual-hosted git repository. jpeach pushed a commit to branch master in repository https://git-dual.apache.org/repos/asf/trafficserver.git
commit 32f84f1834c9b7e32c18b84b2b67c7f0181501f5 Author: James Peach <[email protected]> AuthorDate: Sat May 7 13:21:16 2016 -0700 TS-4425: Switch iocore/cluster over to Ptr::get(). --- iocore/cluster/ClusterCache.cc | 8 ++++---- iocore/cluster/ClusterHandler.cc | 36 ++++++++++++++++----------------- iocore/cluster/ClusterHandlerBase.cc | 10 ++++----- iocore/cluster/ClusterLib.cc | 16 ++++++++------- iocore/cluster/ClusterLoadMonitor.cc | 2 +- iocore/cluster/ClusterProcessor.cc | 3 +-- iocore/cluster/ClusterVConnection.cc | 2 +- iocore/cluster/P_ClusterCache.h | 6 +++--- iocore/cluster/P_ClusterCacheInternal.h | 8 ++++---- iocore/cluster/P_ClusterHandler.h | 18 +++++++++-------- iocore/cluster/P_ClusterInternal.h | 20 ++++++++++++++++++ 11 files changed, 76 insertions(+), 53 deletions(-) diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc index bf3b659..04213c9 100644 --- a/iocore/cluster/ClusterCache.cc +++ b/iocore/cluster/ClusterCache.cc @@ -1616,7 +1616,7 @@ CacheContinuation::replyOpEvent(int event, VConnection *cvc) msg->token = token; // Tell sender conn established OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc(); - pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex); + pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex.get()); read_cluster_vc->allow_remote_close(); results_expected--; } @@ -1688,7 +1688,7 @@ CacheContinuation::replyOpEvent(int event, VConnection *cvc) // Transmit reply message and object data in same cluster message Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64, seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0); - clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data, + clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data.get(), cluster_vc_channel, &token, &CacheContinuation::disposeOfDataBuffer, (void *)this, CLUSTER_OPT_STEAL); } else { @@ -1975,7 +1975,7 @@ cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l) c->freeMsgBuffer(); if (ci.valid()) { // Unmarshaled CacheHTTPInfo contained in reply message, copy it. - c->setMsgBufferLen(len, iob); + c->setMsgBufferLen(len, iob.get()); c->ic_new_info = ci; } msg->seq_number = len; // HACK ALERT: reusing variable @@ -1996,7 +1996,7 @@ cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l) c->token = msg->token; if (ci.valid()) { // Unmarshaled CacheHTTPInfo contained in reply message, copy it. - c->setMsgBufferLen(len, iob); + c->setMsgBufferLen(len, iob.get()); c->ic_new_info = ci; } c->result_error = op_result_error; diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc index da07b4d..57e9ab1 100644 --- a/iocore/cluster/ClusterHandler.cc +++ b/iocore/cluster/ClusterHandler.cc @@ -369,11 +369,11 @@ ClusterHandler::close_free_lock(ClusterVConnection *vc, ClusterVConnState *s) { Ptr<ProxyMutex> m(s->vio.mutex); if (s == &vc->read) { - if ((ProxyMutex *)vc->read_locked) + if (vc->read_locked) MUTEX_UNTAKE_LOCK(vc->read_locked, thread); vc->read_locked = NULL; } else { - if ((ProxyMutex *)vc->write_locked) + if (vc->write_locked) MUTEX_UNTAKE_LOCK(vc->write_locked, thread); vc->write_locked = NULL; } @@ -569,7 +569,7 @@ ClusterHandler::build_initial_vector(bool read_flag) ///////////////////////////////////// // Try to get the read VIO mutex ///////////////////////////////////// - ink_release_assert(!(ProxyMutex *)vc->read_locked); + ink_release_assert(!vc->read_locked); #ifdef CLUSTER_TOMCAT if (!vc->read.vio.mutex || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) @@ -617,7 +617,7 @@ ClusterHandler::build_initial_vector(bool read_flag) bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block); // Sanity check, assert we have the lock if (!remote_write_fill) { - ink_assert((ProxyMutex *)vc->write_locked); + ink_assert(vc->write_locked); } if (vc_ok_write(vc) || remote_write_fill) { if (remote_write_fill) { @@ -633,9 +633,10 @@ ClusterHandler::build_initial_vector(bool read_flag) vc->write_list_bytes -= (int)s.msg.descriptor[i].length; vc->write_bytes_in_transit += (int)s.msg.descriptor[i].length; - vc->write_list_tail = vc->write_list; - while (vc->write_list_tail && vc->write_list_tail->next) - vc->write_list_tail = vc->write_list_tail->next; + vc->write_list_tail = vc->write_list.get(); + while (vc->write_list_tail && vc->write_list_tail->next) { + vc->write_list_tail = vc->write_list_tail->next.get(); + } } } else { Debug(CL_NOTE, "faking cluster write data"); @@ -761,7 +762,7 @@ ClusterHandler::get_read_locks() continue; } - ink_assert(!(ProxyMutex *)vc->read_locked); + ink_assert(!vc->read_locked); vc->read_locked = vc->read.vio.mutex; if (vc->byte_bank_q.head || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) { @@ -815,7 +816,7 @@ ClusterHandler::get_write_locks() // already have a reference to the buffer continue; } - ink_assert(!(ProxyMutex *)vc->write_locked); + ink_assert(!vc->write_locked); vc->write_locked = vc->write.vio.mutex; #ifdef CLUSTER_TOMCAT if (vc->write_locked && @@ -1110,7 +1111,7 @@ ClusterHandler::update_channels_read() continue; } - if (!vc->pending_remote_fill && vc_ok_read(vc) && (!((ProxyMutex *)vc->read_locked) || vc->byte_bank_q.head)) { + if (!vc->pending_remote_fill && vc_ok_read(vc) && (!vc->read_locked || vc->byte_bank_q.head)) { // // Byte bank active or unable to acquire lock on VC. // Move data into the byte bank and attempt delivery @@ -1120,7 +1121,7 @@ ClusterHandler::update_channels_read() add_to_byte_bank(vc); } else { - if (vc->pending_remote_fill || ((ProxyMutex *)vc->read_locked && vc_ok_read(vc))) { + if (vc->pending_remote_fill || (vc->read_locked && vc_ok_read(vc))) { vc->read_block->fill(len); // note bytes received if (!vc->pending_remote_fill) { vc->read.vio.buffer.writer()->append_block(vc->read_block->clone()); @@ -1146,9 +1147,8 @@ ClusterHandler::update_channels_read() // for message processing which cannot be done with a ET_CLUSTER thread. // int -ClusterHandler::process_incoming_callouts(ProxyMutex *m) +ClusterHandler::process_incoming_callouts(ProxyMutex *mutex) { - ProxyMutex *mutex = m; ink_hrtime now; // // Atomically dequeue all active requests from the external queue and @@ -1308,7 +1308,7 @@ ClusterHandler::update_channels_partial_read() vc->read_block->fill(len); // note bytes received if (!vc->pending_remote_fill) { - if ((ProxyMutex *)vc->read_locked) { + if (vc->read_locked) { Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len); s->vio.buffer.writer()->append_block(vc->read_block->clone()); if (complete_channel_read(len, vc)) { @@ -1358,7 +1358,7 @@ ClusterHandler::complete_channel_read(int len, ClusterVConnection *vc) if (vc->closed) return false; // No action if already closed - ink_assert((ProxyMutex *)s->vio.mutex == (ProxyMutex *)s->vio._cont->mutex); + ink_assert(s->vio.mutex == s->vio._cont->mutex); Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len); s->vio.ndone += len; @@ -2316,12 +2316,12 @@ ClusterHandler::free_locks(bool read_flag, int i) ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; if (VALID_CHANNEL(vc)) { if (read_flag) { - if ((ProxyMutex *)vc->read_locked) { + if (vc->read_locked) { MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread); vc->read_locked = NULL; } } else { - if ((ProxyMutex *)vc->write_locked) { + if (vc->write_locked) { MUTEX_UNTAKE_LOCK(vc->write_locked, thread); vc->write_locked = NULL; } @@ -2331,7 +2331,7 @@ ClusterHandler::free_locks(bool read_flag, int i) s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) { ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; if (VALID_CHANNEL(vc)) { - if ((ProxyMutex *)vc->read_locked) { + if (vc->read_locked) { MUTEX_UNTAKE_LOCK(vc->read_locked, thread); vc->read_locked = NULL; } diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc index a396f7c..cb7064c 100644 --- a/iocore/cluster/ClusterHandlerBase.cc +++ b/iocore/cluster/ClusterHandlerBase.cc @@ -53,7 +53,7 @@ ClusterCalloutContinuation::~ClusterCalloutContinuation() int ClusterCalloutContinuation::CalloutHandler(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) { - return _ch->process_incoming_callouts(this->mutex); + return _ch->process_incoming_callouts(this->mutex.get()); } /*************************************************************************/ @@ -67,8 +67,7 @@ ClusterControl::ClusterControl() void ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary) { - EThread *thread = this_ethread(); - ProxyMutex *mutex = thread->mutex; + ProxyMutex *mutex = this_ethread()->mutex; ink_assert(!data); if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) { @@ -316,11 +315,12 @@ ClusterState::build_do_io_vector() if (last_block) { last_block->next = block[n]; } - last_block = block[n]; + last_block = block[n].get(); while (last_block->next) { - last_block = last_block->next; + last_block = last_block->next.get(); } } + mbuf->_writer = block[0]; ink_release_assert(bytes_to_xfer == to_do); ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel)); diff --git a/iocore/cluster/ClusterLib.cc b/iocore/cluster/ClusterLib.cc index cd24208..e46e56f 100644 --- a/iocore/cluster/ClusterLib.cc +++ b/iocore/cluster/ClusterLib.cc @@ -104,13 +104,13 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock ** while (bsrc && nbytes) { // Skip zero length blocks if (!bsrc->read_avail()) { - bsrc = bsrc->next; + bsrc = bsrc->next.get(); continue; } if (bclone_head) { bclone->next = bsrc->clone(); - bclone = bclone->next; + bclone = bclone->next.get(); } else { // Skip bytes already processed if (bytes_to_skip) { @@ -125,7 +125,7 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock ** } else { // Skip entire block - bsrc = bsrc->next; + bsrc = bsrc->next.get(); continue; } } else { @@ -140,7 +140,7 @@ clone_IOBufferBlockList(IOBufferBlock *b, int start_off, int n, IOBufferBlock ** bclone->fill(nbytes); nbytes = 0; } - bsrc = bsrc->next; + bsrc = bsrc->next.get(); } ink_release_assert(!nbytes); *b_tail = bclone; @@ -167,14 +167,15 @@ consume_IOBufferBlockList(IOBufferBlock *b, int64_t n) } else { // Consumed entire block - b_remainder = b->next; + b_remainder = b->next.get(); } break; } else { - b = b->next; + b = b->next.get(); } } + ink_release_assert(nbytes == 0); return b_remainder; // return remaining blocks } @@ -191,8 +192,9 @@ bytes_IOBufferBlockList(IOBufferBlock *b, int64_t read_avail_bytes) } else { n += b->write_avail(); } - b = b->next; + b = b->next.get(); } + return n; } diff --git a/iocore/cluster/ClusterLoadMonitor.cc b/iocore/cluster/ClusterLoadMonitor.cc index 42af4cf..de3157d 100644 --- a/iocore/cluster/ClusterLoadMonitor.cc +++ b/iocore/cluster/ClusterLoadMonitor.cc @@ -218,7 +218,7 @@ void ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number) { #ifdef CLUSTER_TOMCAT - ProxyMutex *mutex = this->ch->mutex; // hack for stats + ProxyMutex *mutex = this->ch->mutex.get(); // hack for stats #endif CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time); diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc index e8655b6..699d09c 100644 --- a/iocore/cluster/ClusterProcessor.cc +++ b/iocore/cluster/ClusterProcessor.cc @@ -53,8 +53,7 @@ ClusterProcessor::~ClusterProcessor() int ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options, void *cmsg) { - EThread *thread = this_ethread(); - ProxyMutex *mutex = thread->mutex; + ProxyMutex *mutex = this_ethread()->mutex; // // RPC facility for intercluster communication available to other // subsystems. diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc index 1fa0f64..f6e97d0 100644 --- a/iocore/cluster/ClusterVConnection.cc +++ b/iocore/cluster/ClusterVConnection.cc @@ -31,7 +31,7 @@ ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator"); ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator"); ByteBankDescriptor * -ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock *iob) +ByteBankDescriptor::ByteBankDescriptor_alloc(Ptr<IOBufferBlock> &iob) { ByteBankDescriptor *b = byteBankAllocator.alloc(); b->block = iob; diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h index 06b6afa..f774bb5 100644 --- a/iocore/cluster/P_ClusterCache.h +++ b/iocore/cluster/P_ClusterCache.h @@ -454,12 +454,12 @@ class ByteBankDescriptor public: ByteBankDescriptor() {} IOBufferBlock * - get_block() + get_block() const { - return block; + return block.get(); } - static ByteBankDescriptor *ByteBankDescriptor_alloc(IOBufferBlock *); + static ByteBankDescriptor *ByteBankDescriptor_alloc(Ptr<IOBufferBlock> &); static void ByteBankDescriptor_free(ByteBankDescriptor *); public: diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h index b0b4d15..4437972 100644 --- a/iocore/cluster/P_ClusterCacheInternal.h +++ b/iocore/cluster/P_ClusterCacheInternal.h @@ -195,7 +195,7 @@ struct CacheContinuation : public Continuation { inline void setMsgBufferLen(int l, IOBufferData *b = 0) { - ink_assert(rw_buf_msg == 0); + ink_assert(!rw_buf_msg); ink_assert(rw_buf_msg_len == 0); rw_buf_msg = b; @@ -211,7 +211,7 @@ struct CacheContinuation : public Continuation { inline void allocMsgBuffer() { - ink_assert(rw_buf_msg == 0); + ink_assert(!rw_buf_msg); ink_assert(rw_buf_msg_len); if (rw_buf_msg_len <= DEFAULT_MAX_BUFFER_SIZE) { rw_buf_msg = new_IOBufferData(buffer_size_to_index(rw_buf_msg_len, MAX_BUFFER_SIZE_INDEX)); @@ -228,9 +228,9 @@ struct CacheContinuation : public Continuation { } inline IOBufferData * - getMsgBufferIOBData() + getMsgBufferIOBData() const { - return rw_buf_msg; + return rw_buf_msg.get(); } inline void diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h index edf2d49..5dc169a 100644 --- a/iocore/cluster/P_ClusterHandler.h +++ b/iocore/cluster/P_ClusterHandler.h @@ -52,17 +52,19 @@ struct ClusterControl : public Continuation { Ptr<IOBufferBlock> iob_block; IOBufferBlock * - get_block() + get_block() const { - return iob_block; + return iob_block.get(); } + bool - fast_data() + fast_data() const { return (len <= MAX_FAST_CONTROL_MESSAGE); } + bool - valid_alloc_data() + valid_alloc_data() const { return iob_block && real_data && data; } @@ -240,9 +242,9 @@ struct ClusterMsg { } IOBufferBlock * - get_block() + get_block() const { - return iob_descriptor_block; + return iob_descriptor_block.get(); } IOBufferBlock * @@ -255,7 +257,7 @@ struct ClusterMsg { iob_descriptor_block->next = 0; iob_descriptor_block->fill(start_offset); iob_descriptor_block->consume(start_offset); - return iob_descriptor_block; + return iob_descriptor_block.get(); } IOBufferBlock * @@ -268,7 +270,7 @@ struct ClusterMsg { iob_descriptor_block->next = 0; iob_descriptor_block->fill(start_offset); iob_descriptor_block->consume(start_offset); - return iob_descriptor_block; + return iob_descriptor_block.get(); } void diff --git a/iocore/cluster/P_ClusterInternal.h b/iocore/cluster/P_ClusterInternal.h index 844934f..1011eb0 100644 --- a/iocore/cluster/P_ClusterInternal.h +++ b/iocore/cluster/P_ClusterInternal.h @@ -493,9 +493,29 @@ extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, Clus extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int); extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **); + +static inline IOBufferBlock * +clone_IOBufferBlockList(Ptr<IOBufferBlock> &src, int start_off, int b, IOBufferBlock **b_tail) +{ + return clone_IOBufferBlockList(src.get(), start_off, b, b_tail); +} + extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t); + +static inline IOBufferBlock * +consume_IOBufferBlockList(Ptr<IOBufferBlock> &b, int64_t n) +{ + return consume_IOBufferBlockList(b.get(), n); +} + extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t); +static inline int64_t +bytes_IOBufferBlockList(Ptr<IOBufferBlock> &b, int64_t read_avail_bytes) +{ + return bytes_IOBufferBlockList(b.get(), read_avail_bytes); +} + // ClusterVConnection declarations extern void clusterVCAllocator_free(ClusterVConnection *vc); extern ClassAllocator<ClusterVConnection> clusterVCAllocator; -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
