http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterHandler.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h index 4a62755..c6461ed 100644 --- a/iocore/cluster/P_ClusterHandler.h +++ b/iocore/cluster/P_ClusterHandler.h @@ -34,45 +34,45 @@ class ClusterLoadMonitor; struct ClusterCalloutContinuation; -typedef int (ClusterCalloutContinuation::*ClstCoutContHandler) (int, void *); +typedef int (ClusterCalloutContinuation::*ClstCoutContHandler)(int, void *); -struct ClusterCalloutContinuation:public Continuation -{ +struct ClusterCalloutContinuation : public Continuation { struct ClusterHandler *_ch; - int CalloutHandler(int event, Event * e); - ClusterCalloutContinuation(struct ClusterHandler *ch); - ~ClusterCalloutContinuation(); + int CalloutHandler(int event, Event *e); + ClusterCalloutContinuation(struct ClusterHandler *ch); + ~ClusterCalloutContinuation(); }; -struct ClusterControl: public Continuation -{ +struct ClusterControl : public Continuation { int len; // TODO: Should this be 64-bit ? char size_index; int64_t *real_data; char *data; - void (*free_proc) (void *); + void (*free_proc)(void *); void *free_proc_arg; - Ptr<IOBufferBlock> iob_block; + Ptr<IOBufferBlock> iob_block; - IOBufferBlock *get_block() + IOBufferBlock * + get_block() { return iob_block; } - bool fast_data() + bool + fast_data() { return (len <= MAX_FAST_CONTROL_MESSAGE); } - bool valid_alloc_data() + bool + valid_alloc_data() { return iob_block && real_data && data; } - enum - { + enum { // DATA_HDR = size_index (1 byte) + magicno (1 byte) + sizeof(this) - DATA_HDR = (sizeof(int64_t) * 2) // must be multiple of sizeof(int64_t) + DATA_HDR = (sizeof(int64_t) * 2) // must be multiple of sizeof(int64_t) }; ClusterControl(); @@ -81,19 +81,21 @@ struct ClusterControl: public Continuation virtual void freeall() = 0; }; -struct OutgoingControl: public ClusterControl -{ +struct OutgoingControl : public ClusterControl { ClusterHandler *ch; ink_hrtime submit_time; static OutgoingControl *alloc(); - OutgoingControl(); - void alloc_data(bool align_int32_on_non_int64_boundary = true) { - real_alloc_data(1, align_int32_on_non_int64_boundary); /* read access */ + OutgoingControl(); + void + alloc_data(bool align_int32_on_non_int64_boundary = true) + { + real_alloc_data(1, align_int32_on_non_int64_boundary); /* read access */ } - void set_data(char *adata, int alen) + void + set_data(char *adata, int alen) { data = adata; len = alen; @@ -108,31 +110,33 @@ struct OutgoingControl: public ClusterControl iob_block->_buf_end = iob_block->end(); } - void set_data(IOBufferBlock * buf, void (*free_data_proc) (void *), void *free_data_arg) + void + set_data(IOBufferBlock *buf, void (*free_data_proc)(void *), void *free_data_arg) { data = buf->data->data(); - len = bytes_IOBufferBlockList(buf, 1); // read avail bytes + len = bytes_IOBufferBlockList(buf, 1); // read avail bytes free_proc = free_data_proc; free_proc_arg = free_data_arg; real_data = 0; iob_block = buf; } - int startEvent(int event, Event * e); + int startEvent(int event, Event *e); virtual void freeall(); }; // // incoming control messsage are received by this machine // -struct IncomingControl: public ClusterControl -{ +struct IncomingControl : public ClusterControl { ink_hrtime recognized_time; static IncomingControl *alloc(); - IncomingControl(); - void alloc_data(bool align_int32_on_non_int64_boundary = true) { - real_alloc_data(0, align_int32_on_non_int64_boundary); /* write access */ + IncomingControl(); + void + alloc_data(bool align_int32_on_non_int64_boundary = true) + { + real_alloc_data(0, align_int32_on_non_int64_boundary); /* write access */ } virtual void freeall(); }; @@ -140,21 +144,17 @@ struct IncomingControl: public ClusterControl // // Interface structure for internal_invoke_remote() // -struct invoke_remote_data_args -{ +struct invoke_remote_data_args { int32_t magicno; OutgoingControl *msg_oc; OutgoingControl *data_oc; int dest_channel; ClusterVCToken token; - enum - { - MagicNo = 0x04141998 + enum { + MagicNo = 0x04141998, }; - invoke_remote_data_args():magicno(MagicNo), msg_oc(NULL), data_oc(NULL), dest_channel(0) - { - } + invoke_remote_data_args() : magicno(MagicNo), msg_oc(NULL), data_oc(NULL), dest_channel(0) {} }; // @@ -163,27 +163,26 @@ struct invoke_remote_data_args // // type -#define CLUSTER_SEND_FREE 0 -#define CLUSTER_SEND_DATA 1 -#define CLUSTER_SEQUENCE_NUMBER(_x) (((unsigned int)_x)&0xFFFF) - -struct Descriptor -{ // Note: Over the Wire structure - uint32_t type:1; - uint32_t channel:15; - uint16_t sequence_number; // lower 16 bits of the ClusterVCToken.seq +#define CLUSTER_SEND_FREE 0 +#define CLUSTER_SEND_DATA 1 +#define CLUSTER_SEQUENCE_NUMBER(_x) (((unsigned int)_x) & 0xFFFF) + +struct Descriptor { // Note: Over the Wire structure + uint32_t type : 1; + uint32_t channel : 15; + uint16_t sequence_number; // lower 16 bits of the ClusterVCToken.seq uint32_t length; - inline void SwapBytes() + inline void + SwapBytes() { - ats_swap16((uint16_t *) this); // Hack - ats_swap16((uint16_t *) & sequence_number); - ats_swap32((uint32_t *) & length); + ats_swap16((uint16_t *)this); // Hack + ats_swap16((uint16_t *)&sequence_number); + ats_swap32((uint32_t *)&length); } }; -struct ClusterMsgHeader -{ // Note: Over the Wire structure +struct ClusterMsgHeader { // Note: Over the Wire structure uint16_t count; uint16_t descriptor_cksum; uint16_t control_bytes_cksum; @@ -191,7 +190,8 @@ struct ClusterMsgHeader uint32_t control_bytes; uint32_t count_check; - void clear() + void + clear() { count = 0; descriptor_cksum = 0; @@ -200,34 +200,32 @@ struct ClusterMsgHeader control_bytes = 0; count_check = 0; } - ClusterMsgHeader():count(0), descriptor_cksum(0), control_bytes_cksum(0), unused(0), control_bytes(0), count_check(0) - { - } - inline void SwapBytes() + ClusterMsgHeader() : count(0), descriptor_cksum(0), control_bytes_cksum(0), unused(0), control_bytes(0), count_check(0) {} + inline void + SwapBytes() { - ats_swap16((uint16_t *) & count); - ats_swap16((uint16_t *) & descriptor_cksum); - ats_swap16((uint16_t *) & control_bytes_cksum); - ats_swap16((uint16_t *) & unused); - ats_swap32((uint32_t *) & control_bytes); - ats_swap32((uint32_t *) & count_check); + ats_swap16((uint16_t *)&count); + ats_swap16((uint16_t *)&descriptor_cksum); + ats_swap16((uint16_t *)&control_bytes_cksum); + ats_swap16((uint16_t *)&unused); + ats_swap32((uint32_t *)&control_bytes); + ats_swap32((uint32_t *)&count_check); } }; -struct ClusterMsg -{ +struct ClusterMsg { Descriptor *descriptor; - Ptr<IOBufferBlock> iob_descriptor_block; + Ptr<IOBufferBlock> iob_descriptor_block; int count; int control_bytes; int descriptor_cksum; int control_bytes_cksum; int unused; - int state; // Only used by read to denote - // read phase (count, descriptor, data) - Queue<OutgoingControl> outgoing_control; - Queue<OutgoingControl> outgoing_small_control; - Queue<OutgoingControl> outgoing_callout; // compound msg callbacks + int state; // Only used by read to denote + // read phase (count, descriptor, data) + Queue<OutgoingControl> outgoing_control; + Queue<OutgoingControl> outgoing_small_control; + Queue<OutgoingControl> outgoing_callout; // compound msg callbacks // read processing usage. int control_data_offset; @@ -237,22 +235,24 @@ struct ClusterMsg int did_large_control_msgs; int did_freespace_msgs; - ClusterMsgHeader *hdr() + ClusterMsgHeader * + hdr() { - return (ClusterMsgHeader *) (((char *) descriptor) - - sizeof(ClusterMsgHeader)); + return (ClusterMsgHeader *)(((char *)descriptor) - sizeof(ClusterMsgHeader)); } - IOBufferBlock *get_block() + IOBufferBlock * + get_block() { return iob_descriptor_block; } - IOBufferBlock *get_block_header() + IOBufferBlock * + get_block_header() { int start_offset; - start_offset = (char *) hdr() - iob_descriptor_block->buf(); + start_offset = (char *)hdr() - iob_descriptor_block->buf(); iob_descriptor_block->reset(); iob_descriptor_block->next = 0; iob_descriptor_block->fill(start_offset); @@ -260,12 +260,12 @@ struct ClusterMsg return iob_descriptor_block; } - IOBufferBlock *get_block_descriptor() + IOBufferBlock * + get_block_descriptor() { int start_offset; - start_offset = ((char *) hdr() + sizeof(ClusterMsgHeader)) - - iob_descriptor_block->buf(); + start_offset = ((char *)hdr() + sizeof(ClusterMsgHeader)) - iob_descriptor_block->buf(); iob_descriptor_block->reset(); iob_descriptor_block->next = 0; iob_descriptor_block->fill(start_offset); @@ -273,7 +273,8 @@ struct ClusterMsg return iob_descriptor_block; } - void clear() + void + clear() { hdr()->clear(); count = 0; @@ -291,10 +292,11 @@ struct ClusterMsg did_large_control_msgs = 0; did_freespace_msgs = 0; } - uint16_t calc_control_bytes_cksum() + uint16_t + calc_control_bytes_cksum() { uint16_t cksum = 0; - char *p = (char *) &descriptor[count]; + char *p = (char *)&descriptor[count]; char *endp = p + control_bytes; while (p < endp) { cksum += *p; @@ -302,56 +304,53 @@ struct ClusterMsg } return cksum; } - uint16_t calc_descriptor_cksum() + uint16_t + calc_descriptor_cksum() { uint16_t cksum = 0; - char *p = (char *) &descriptor[0]; - char *endp = (char *) &descriptor[count]; + char *p = (char *)&descriptor[0]; + char *endp = (char *)&descriptor[count]; while (p < endp) { cksum += *p; ++p; } return cksum; } -ClusterMsg():descriptor(NULL), iob_descriptor_block(NULL), count(0), - control_bytes(0), - descriptor_cksum(0), control_bytes_cksum(0), - unused(0), state(0), - control_data_offset(0), - did_small_control_set_data(0), - did_large_control_set_data(0), did_small_control_msgs(0), did_large_control_msgs(0), did_freespace_msgs(0) { + ClusterMsg() + : descriptor(NULL), iob_descriptor_block(NULL), count(0), control_bytes(0), descriptor_cksum(0), control_bytes_cksum(0), + unused(0), state(0), control_data_offset(0), did_small_control_set_data(0), did_large_control_set_data(0), + did_small_control_msgs(0), did_large_control_msgs(0), did_freespace_msgs(0) + { } - }; // // State for a particular (read/write) direction of a cluster link // struct ClusterHandler; -struct ClusterState: public Continuation -{ +struct ClusterState : public Continuation { ClusterHandler *ch; bool read_channel; - bool do_iodone_event; // schedule_imm() on i/o complete + bool do_iodone_event; // schedule_imm() on i/o complete int n_descriptors; ClusterMsg msg; unsigned int sequence_number; - int to_do; // # of bytes to transact - int did; // # of bytes transacted - int n_iov; // defined iov(s) in this operation - int io_complete; // current i/o complete - int io_complete_event; // current i/o complete event - VIO *v; // VIO associated with current op - int bytes_xfered; // bytes xfered at last callback - int last_ndone; // last do_io ndone + int to_do; // # of bytes to transact + int did; // # of bytes transacted + int n_iov; // defined iov(s) in this operation + int io_complete; // current i/o complete + int io_complete_event; // current i/o complete event + VIO *v; // VIO associated with current op + int bytes_xfered; // bytes xfered at last callback + int last_ndone; // last do_io ndone int total_bytes_xfered; - IOVec *iov; // io vector for readv, writev + IOVec *iov; // io vector for readv, writev Ptr<IOBufferData> iob_iov; // Write byte bank structures - char *byte_bank; // bytes buffered for transit - int n_byte_bank; // number of bytes buffered for transit - int byte_bank_size; // allocated size of byte bank + char *byte_bank; // bytes buffered for transit + int n_byte_bank; // number of bytes buffered for transit + int byte_bank_size; // allocated size of byte bank int missed; bool missed_msg; @@ -360,11 +359,10 @@ struct ClusterState: public Continuation Ptr<IOBufferBlock> block[MAX_TCOUNT]; class MIOBuffer *mbuf; - int state; // See enum defs below + int state; // See enum defs below - enum - { + enum { READ_START = 1, READ_HEADER, READ_AWAIT_HEADER, @@ -378,14 +376,13 @@ struct ClusterState: public Continuation READ_COMPLETE } read_state_t; - enum - { + enum { WRITE_START = 1, WRITE_SETUP, WRITE_INITIATE, WRITE_AWAIT_COMPLETION, WRITE_POST_COMPLETE, - WRITE_COMPLETE + WRITE_COMPLETE, } write_state_t; ClusterState(ClusterHandler *, bool); @@ -402,8 +399,7 @@ struct ClusterState: public Continuation // ClusterHandlerBase superclass for processors with // bi-directional VConnections. // -struct ClusterHandlerBase: public Continuation -{ +struct ClusterHandlerBase : public Continuation { // // Private // @@ -413,13 +409,10 @@ struct ClusterHandlerBase: public Continuation int min_priority; Event *trigger_event; - ClusterHandlerBase():Continuation(NULL), read_vcs(NULL), write_vcs(NULL), cur_vcs(0), min_priority(1) - { - } + ClusterHandlerBase() : Continuation(NULL), read_vcs(NULL), write_vcs(NULL), cur_vcs(0), min_priority(1) {} }; -struct ClusterHandler:public ClusterHandlerBase -{ +struct ClusterHandler : public ClusterHandlerBase { #ifdef MSG_TRACE FILE *t_fd; #endif @@ -434,11 +427,10 @@ struct ClusterHandler:public ClusterHandlerBase bool dead; bool downing; - int32_t active; // handler currently running + int32_t active; // handler currently running bool on_stolen_thread; - struct ChannelData - { + struct ChannelData { int channel_number; LINK(ChannelData, link); }; @@ -449,16 +441,15 @@ struct ClusterHandler:public ClusterHandlerBase Queue<ChannelData> free_local_channels; bool connector; - int cluster_connect_state; // see clcon_state_t enum + int cluster_connect_state; // see clcon_state_t enum ClusterHelloMessage clusteringVersion; ClusterHelloMessage nodeClusteringVersion; bool needByteSwap; int configLookupFails; -#define CONFIG_LOOKUP_RETRIES 10 +#define CONFIG_LOOKUP_RETRIES 10 - enum - { + enum { CLCON_INITIAL = 1, CLCON_SEND_MSG, CLCON_SEND_MSG_COMPLETE, @@ -475,7 +466,7 @@ struct ClusterHandler:public ClusterHandlerBase InkAtomicList outgoing_control_al[CLUSTER_CMSG_QUEUES]; InkAtomicList external_incoming_control; InkAtomicList external_incoming_open_local; - ClusterCalloutContinuation * callout_cont[MAX_COMPLETION_CALLBACK_EVENTS]; + ClusterCalloutContinuation *callout_cont[MAX_COMPLETION_CALLBACK_EVENTS]; Event *callout_events[MAX_COMPLETION_CALLBACK_EVENTS]; Event *cluster_periodic_event; Queue<OutgoingControl> outgoing_control[CLUSTER_CMSG_QUEUES]; @@ -505,7 +496,7 @@ struct ClusterHandler:public ClusterHandlerBase bool control_message_write; #ifdef CLUSTER_STATS - Ptr<IOBufferBlock> message_blk; + Ptr<IOBufferBlock> message_blk; int64_t _vc_writes; int64_t _vc_write_bytes; @@ -542,7 +533,8 @@ struct ClusterHandler:public ClusterHandlerBase int _n_write_post_complete; int _n_write_complete; - void clear_cluster_stats() + void + clear_cluster_stats() { _vc_writes = 0; _vc_write_bytes = 0; @@ -579,30 +571,31 @@ struct ClusterHandler:public ClusterHandlerBase _n_write_post_complete = 0; _n_write_complete = 0; } -#endif // CLUSTER_STATS +#endif // CLUSTER_STATS ClusterHandler(); ~ClusterHandler(); bool check_channel(int c); - int alloc_channel(ClusterVConnection * vc, int requested_channel = 0); - void free_channel(ClusterVConnection * vc); -// -// local_channel() -// - Initiator node-node TCP socket && Odd channel => Local Channel -// - !Initiator node-node TCP socket && Even channel => Local Channel - inline bool local_channel(int i) + int alloc_channel(ClusterVConnection *vc, int requested_channel = 0); + void free_channel(ClusterVConnection *vc); + // + // local_channel() + // - Initiator node-node TCP socket && Odd channel => Local Channel + // - !Initiator node-node TCP socket && Even channel => Local Channel + inline bool + local_channel(int i) { return !connector == !(i & 1); } void close_ClusterVConnection(ClusterVConnection *); - int cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s); - int cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s); - int cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno); + int cluster_signal_and_update(int event, ClusterVConnection *vc, ClusterVConnState *s); + int cluster_signal_and_update_locked(int event, ClusterVConnection *vc, ClusterVConnState *s); + int cluster_signal_error_and_update(ClusterVConnection *vc, ClusterVConnState *s, int lerrno); void close_free_lock(ClusterVConnection *, ClusterVConnState *); -#define CLUSTER_READ true -#define CLUSTER_WRITE false +#define CLUSTER_READ true +#define CLUSTER_WRITE false bool build_data_vector(char *, int, bool); bool build_initial_vector(bool); @@ -615,7 +608,7 @@ struct ClusterHandler:public ClusterHandlerBase void process_small_control_msgs(); void process_large_control_msgs(); void process_freespace_msgs(); - bool complete_channel_read(int, ClusterVConnection * vc); + bool complete_channel_read(int, ClusterVConnection *vc); void finish_delayed_reads(); // returns: false if the channel was closed @@ -625,27 +618,27 @@ struct ClusterHandler:public ClusterHandlerBase int build_freespace_descriptors(); int build_controlmsg_descriptors(); int add_small_controlmsg_descriptors(); - int valid_for_data_write(ClusterVConnection * vc); - int valid_for_freespace_write(ClusterVConnection * vc); + int valid_for_data_write(ClusterVConnection *vc); + int valid_for_freespace_write(ClusterVConnection *vc); int machine_down(); - int remote_close(ClusterVConnection * vc, ClusterVConnState * ns); - void steal_thread(EThread * t); + int remote_close(ClusterVConnection *vc, ClusterVConnState *ns); + void steal_thread(EThread *t); #define CLUSTER_FREE_ALL_LOCKS -1 void free_locks(bool read_flag, int i = CLUSTER_FREE_ALL_LOCKS); bool get_read_locks(); bool get_write_locks(); - int zombify(Event * e = NULL); // optional event to use + int zombify(Event *e = NULL); // optional event to use - int connectClusterEvent(int event, Event * e); - int startClusterEvent(int event, Event * e); - int mainClusterEvent(int event, Event * e); - int beginClusterEvent(int event, Event * e); - int zombieClusterEvent(int event, Event * e); - int protoZombieEvent(int event, Event * e); + int connectClusterEvent(int event, Event *e); + int startClusterEvent(int event, Event *e); + int mainClusterEvent(int event, Event *e); + int beginClusterEvent(int event, Event *e); + int zombieClusterEvent(int event, Event *e); + int protoZombieEvent(int event, Event *e); - void vcs_push(ClusterVConnection * vc, int type); + void vcs_push(ClusterVConnection *vc, int type); bool vc_ok_read(ClusterVConnection *); bool vc_ok_write(ClusterVConnection *); int do_open_local_requests(); @@ -664,7 +657,7 @@ struct ClusterHandler:public ClusterHandlerBase }; // Valid (ClusterVConnection *) in ClusterHandler.channels[] -#define VALID_CHANNEL(vc) (vc && !(((uintptr_t) vc) & 1)) +#define VALID_CHANNEL(vc) (vc && !(((uintptr_t)vc) & 1)) // outgoing control continuations extern ClassAllocator<OutgoingControl> outControlAllocator;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterInline.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h index b4c1f15..246f65d 100644 --- a/iocore/cluster/P_ClusterInline.h +++ b/iocore/cluster/P_ClusterInline.h @@ -33,7 +33,7 @@ #include "P_ClusterHandler.h" inline Action * -Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len) +Cluster_lookup(Continuation *cont, CacheKey *key, CacheFragType frag_type, char *hostname, int host_len) { // Try to send remote, if not possible, handle locally Action *retAct; @@ -48,24 +48,22 @@ Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, cha } else { // not remote, do local lookup CacheContinuation::cacheContAllocator_free(cc); - return (Action *) NULL; + return (Action *)NULL; } } else { Action a; a = cont; return CacheContinuation::callback_failure(&a, CACHE_EVENT_LOOKUP_FAILED, 0); } - return (Action *) NULL; + return (Action *)NULL; } inline Action * -Cluster_read(ClusterMachine * owner_machine, int opcode, - Continuation * cont, MIOBuffer * buf, - CacheURL * url, CacheHTTPHdr * request, - CacheLookupHttpConfig * params, CacheKey * key, - time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len) +Cluster_read(ClusterMachine *owner_machine, int opcode, Continuation *cont, MIOBuffer *buf, CacheURL *url, CacheHTTPHdr *request, + CacheLookupHttpConfig *params, CacheKey *key, time_t pin_in_cache, CacheFragType frag_type, char *hostname, + int host_len) { - (void) params; + (void)params; if (clusterProcessor.disable_remote_cluster_ops(owner_machine)) { Action a; a = cont; @@ -79,8 +77,7 @@ Cluster_read(ClusterMachine * owner_machine, int opcode, char *data; if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { - if ((opcode == CACHE_OPEN_READ_LONG) - || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) { + if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) { // Determine length of data to Marshal flen = op_to_sizeof_fixedlen_msg(opcode); @@ -95,11 +92,11 @@ Cluster_read(ClusterMachine * owner_machine, int opcode, len += params->marshal_length(); len += url_hlen; - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data goto err_exit; // Perform data Marshal operation - msg = (char *) ALLOCA_DOUBLE(flen + len); + msg = (char *)ALLOCA_DOUBLE(flen + len); data = msg + flen; int cur_len = len; @@ -119,8 +116,7 @@ Cluster_read(ClusterMachine * owner_machine, int opcode, readArgs.url_md5 = &url_only_md5; readArgs.pin_in_cache = pin_in_cache; readArgs.frag_type = frag_type; - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs, - opcode, (char *) msg, (flen + len), -1, buf); + return CacheContinuation::do_op(cont, owner_machine, (void *)&readArgs, opcode, (char *)msg, (flen + len), -1, buf); } else { // Build message if we have host data. @@ -129,10 +125,10 @@ Cluster_read(ClusterMachine * owner_machine, int opcode, flen = op_to_sizeof_fixedlen_msg(opcode); len = host_len; - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data goto err_exit; - msg = (char *) ALLOCA_DOUBLE(flen + len); + msg = (char *)ALLOCA_DOUBLE(flen + len); data = msg + flen; memcpy(data, hostname, host_len); @@ -144,8 +140,7 @@ Cluster_read(ClusterMachine * owner_machine, int opcode, CacheOpArgs_General readArgs; readArgs.url_md5 = key; readArgs.frag_type = frag_type; - return CacheContinuation::do_op(cont, owner_machine, (void *) &readArgs, - opcode, (char *) msg, (flen + len), -1, buf); + return CacheContinuation::do_op(cont, owner_machine, (void *)&readArgs, opcode, (char *)msg, (flen + len), -1, buf); } } else { @@ -161,15 +156,12 @@ err_exit: } inline Action * -Cluster_write(Continuation * cont, int expected_size, - MIOBuffer * buf, ClusterMachine * m, - INK_MD5 * url_md5, CacheFragType ft, int options, - time_t pin_in_cache, int opcode, - CacheKey * key, CacheURL * url, - CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len) +Cluster_write(Continuation *cont, int expected_size, MIOBuffer *buf, ClusterMachine *m, INK_MD5 *url_md5, CacheFragType ft, + int options, time_t pin_in_cache, int opcode, CacheKey *key, CacheURL *url, CacheHTTPHdr *request, + CacheHTTPInfo *old_info, char *hostname, int host_len) { - (void) key; - (void) request; + (void)key; + (void)request; if (clusterProcessor.disable_remote_cluster_ops(m)) { Action a; a = cont; @@ -183,65 +175,62 @@ Cluster_write(Continuation * cont, int expected_size, int vers = CacheOpMsg_long::protoToVersion(m->msg_proto_major); switch (opcode) { - case CACHE_OPEN_WRITE: - { - // Build message if we have host data - if (host_len) { - // Determine length of data to Marshal - flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE); - len = host_len; + case CACHE_OPEN_WRITE: { + // Build message if we have host data + if (host_len) { + // Determine length of data to Marshal + flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE); + len = host_len; - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data - goto err_exit; + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data + goto err_exit; - msg = (char *) ALLOCA_DOUBLE(flen + len); - data = msg + flen; + msg = (char *)ALLOCA_DOUBLE(flen + len); + data = msg + flen; - memcpy(data, hostname, host_len); - } - break; + memcpy(data, hostname, host_len); } - case CACHE_OPEN_WRITE_LONG: - { - int url_hlen; - const char *url_hostname = url->host_get(&url_hlen); + break; + } + case CACHE_OPEN_WRITE_LONG: { + int url_hlen; + const char *url_hostname = url->host_get(&url_hlen); - // Determine length of data to Marshal - flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE_LONG); - len = 0; + // Determine length of data to Marshal + flen = op_to_sizeof_fixedlen_msg(CACHE_OPEN_WRITE_LONG); + len = 0; - if (old_info == (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES) { - old_info = 0; - allow_multiple_writes = 1; - } - if (old_info) { - len += old_info->marshal_length(); - } - len += url_hlen; + if (old_info == (CacheHTTPInfo *)CACHE_ALLOW_MULTIPLE_WRITES) { + old_info = 0; + allow_multiple_writes = 1; + } + if (old_info) { + len += old_info->marshal_length(); + } + len += url_hlen; - if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data - goto err_exit; + if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data + goto err_exit; - // Perform data Marshal operation - msg = (char *) ALLOCA_DOUBLE(flen + len); - data = msg + flen; + // Perform data Marshal operation + msg = (char *)ALLOCA_DOUBLE(flen + len); + data = msg + flen; - if (old_info) { - int res = old_info->marshal(data, len); + if (old_info) { + int res = old_info->marshal(data, len); - if (res < 0) { - goto err_exit; - } - data += res; + if (res < 0) { + goto err_exit; } - memcpy(data, url_hostname, url_hlen); - break; + data += res; } - default: - { - ink_release_assert(!"open_write_internal invalid opcode."); - } // End of case - } // End of switch + memcpy(data, url_hostname, url_hlen); + break; + } + default: { + ink_release_assert(!"open_write_internal invalid opcode."); + } // End of case + } // End of switch if (vers == CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { // Do remote open_write() @@ -253,13 +242,13 @@ Cluster_write(Continuation * cont, int expected_size, writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0); writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0); - return CacheContinuation::do_op(cont, m, (void *) &writeArgs, opcode, msg, flen + len, expected_size, buf); + return CacheContinuation::do_op(cont, m, (void *)&writeArgs, opcode, msg, flen + len, expected_size, buf); } else { ////////////////////////////////////////////////////////////// // Create the specified down rev version of this message ////////////////////////////////////////////////////////////// ink_release_assert(!"CacheOpMsg_long [write] bad msg version"); - return (Action *) 0; + return (Action *)0; } err_exit: @@ -269,8 +258,7 @@ err_exit: } inline Action * -Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey * to, - CacheFragType type, char *hostname, int host_len) +Cluster_link(ClusterMachine *m, Continuation *cont, CacheKey *from, CacheKey *to, CacheFragType type, char *hostname, int host_len) { if (clusterProcessor.disable_remote_cluster_ops(m)) { Action a; @@ -289,7 +277,7 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data goto err_exit; - char *msg = (char *) ALLOCA_DOUBLE(flen + len); + char *msg = (char *)ALLOCA_DOUBLE(flen + len); memcpy((msg + flen), hostname, host_len); // Setup args for remote link @@ -297,7 +285,7 @@ Cluster_link(ClusterMachine * m, Continuation * cont, CacheKey * from, CacheKey linkArgs.from = from; linkArgs.to = to; linkArgs.frag_type = type; - return CacheContinuation::do_op(cont, m, (void *) &linkArgs, CACHE_LINK, msg, (flen + len)); + return CacheContinuation::do_op(cont, m, (void *)&linkArgs, CACHE_LINK, msg, (flen + len)); } else { ////////////////////////////////////////////////////////////// // Create the specified down rev version of this message @@ -313,7 +301,7 @@ err_exit: } inline Action * -Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len) +Cluster_deref(ClusterMachine *m, Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len) { if (clusterProcessor.disable_remote_cluster_ops(m)) { Action a; @@ -332,14 +320,14 @@ Cluster_deref(ClusterMachine * m, Continuation * cont, CacheKey * key, CacheFrag if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data goto err_exit; - char *msg = (char *) ALLOCA_DOUBLE(flen + len); + char *msg = (char *)ALLOCA_DOUBLE(flen + len); memcpy((msg + flen), hostname, host_len); // Setup args for remote deref CacheOpArgs_Deref drefArgs; drefArgs.md5 = key; drefArgs.frag_type = type; - return CacheContinuation::do_op(cont, m, (void *) &drefArgs, CACHE_DEREF, msg, (flen + len)); + return CacheContinuation::do_op(cont, m, (void *)&drefArgs, CACHE_DEREF, msg, (flen + len)); } else { ////////////////////////////////////////////////////////////// // Create the specified down rev version of this message @@ -355,8 +343,8 @@ err_exit: } inline Action * -Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key, - bool rm_user_agents, bool rm_link, CacheFragType frag_type, char *hostname, int host_len) +Cluster_remove(ClusterMachine *m, Continuation *cont, CacheKey *key, bool rm_user_agents, bool rm_link, CacheFragType frag_type, + char *hostname, int host_len) { if (clusterProcessor.disable_remote_cluster_ops(m)) { Action a; @@ -375,7 +363,7 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key, if ((flen + len) > DEFAULT_MAX_BUFFER_SIZE) // Bound marshalled data goto err_exit; - char *msg = (char *) ALLOCA_DOUBLE(flen + len); + char *msg = (char *)ALLOCA_DOUBLE(flen + len); memcpy((msg + flen), hostname, host_len); // Setup args for remote update @@ -384,13 +372,13 @@ Cluster_remove(ClusterMachine * m, Continuation * cont, CacheKey * key, updateArgs.cfl_flags |= (rm_user_agents ? CFL_REMOVE_USER_AGENTS : 0); updateArgs.cfl_flags |= (rm_link ? CFL_REMOVE_LINK : 0); updateArgs.frag_type = frag_type; - return CacheContinuation::do_op(cont, m, (void *) &updateArgs, CACHE_REMOVE, msg, (flen + len)); + return CacheContinuation::do_op(cont, m, (void *)&updateArgs, CACHE_REMOVE, msg, (flen + len)); } else { ////////////////////////////////////////////////////////////// // Create the specified down rev version of this message ////////////////////////////////////////////////////////////// ink_release_assert(!"CacheOpMsg_short [CACHE_REMOVE] bad msg version"); - return (Action *) 0; + return (Action *)0; } err_exit: http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterInternal.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterInternal.h b/iocore/cluster/P_ClusterInternal.h index eaa18e3..b672972 100644 --- a/iocore/cluster/P_ClusterInternal.h +++ b/iocore/cluster/P_ClusterInternal.h @@ -34,150 +34,140 @@ /*************************************************************************/ // Compilation Options /*************************************************************************/ -#define CLUSTER_THREAD_STEALING 1 -#define CLUSTER_TOMCAT 1 -#define CLUSTER_STATS 1 +#define CLUSTER_THREAD_STEALING 1 +#define CLUSTER_TOMCAT 1 +#define CLUSTER_STATS 1 -#define ALIGN_DOUBLE(_p) ((((uintptr_t) (_p)) + 7) & ~7) +#define ALIGN_DOUBLE(_p) ((((uintptr_t)(_p)) + 7) & ~7) #define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8)) /*************************************************************************/ // Configuration Parameters /*************************************************************************/ // Note: MAX_TCOUNT must be power of 2 -#define MAX_TCOUNT 128 -#define CONTROL_DATA (128*1024) -#define READ_BANK_BUF_SIZE DEFAULT_MAX_BUFFER_SIZE -#define READ_BANK_BUF_INDEX (DEFAULT_BUFFER_SIZES-1) -#define ALLOC_DATA_MAGIC 0xA5 // 8 bits in size -#define READ_LOCK_SPIN_COUNT 1 -#define WRITE_LOCK_SPIN_COUNT 1 +#define MAX_TCOUNT 128 +#define CONTROL_DATA (128 * 1024) +#define READ_BANK_BUF_SIZE DEFAULT_MAX_BUFFER_SIZE +#define READ_BANK_BUF_INDEX (DEFAULT_BUFFER_SIZES - 1) +#define ALLOC_DATA_MAGIC 0xA5 // 8 bits in size +#define READ_LOCK_SPIN_COUNT 1 +#define WRITE_LOCK_SPIN_COUNT 1 // Unix specific optimizations // #define CLUSTER_IMMEDIATE_NETIO 1 - // (see ClusterHandler::mainClusterEvent) - // this is equivalent to a max of 0.7 seconds -#define CLUSTER_BUCKETS 64 -#define CLUSTER_PERIOD HRTIME_MSECONDS(10) +// (see ClusterHandler::mainClusterEvent) +// this is equivalent to a max of 0.7 seconds +#define CLUSTER_BUCKETS 64 +#define CLUSTER_PERIOD HRTIME_MSECONDS(10) - // Per instance maximum time allotted to cluster thread -#define CLUSTER_MAX_RUN_TIME HRTIME_MSECONDS(100) - // Per instance maximum time allotted to thread stealing -#define CLUSTER_MAX_THREAD_STEAL_TIME HRTIME_MSECONDS(10) +// Per instance maximum time allotted to cluster thread +#define CLUSTER_MAX_RUN_TIME HRTIME_MSECONDS(100) +// Per instance maximum time allotted to thread stealing +#define CLUSTER_MAX_THREAD_STEAL_TIME HRTIME_MSECONDS(10) - // minimum number of channels to allocate -#define MIN_CHANNELS 4096 -#define MAX_CHANNELS ((32*1024) - 1) // 15 bits in Descriptor +// minimum number of channels to allocate +#define MIN_CHANNELS 4096 +#define MAX_CHANNELS ((32 * 1024) - 1) // 15 bits in Descriptor -#define CLUSTER_CONTROL_CHANNEL 0 -#define LAST_DEDICATED_CHANNEL 0 +#define CLUSTER_CONTROL_CHANNEL 0 +#define LAST_DEDICATED_CHANNEL 0 -#define CLUSTER_PHASES 1 +#define CLUSTER_PHASES 1 #define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES - // how often to retry connect to machines which are supposed to be in the - // cluster -#define CLUSTER_BUMP_LENGTH 1 -#define CLUSTER_MEMBER_DELAY HRTIME_SECONDS(1) - // How long to leave an unconnected ClusterVConnection waiting - // Note: assumes (CLUSTER_CONNECT_TIMEOUT == 2 * CACHE_CLUSTER_TIMEOUT) +// how often to retry connect to machines which are supposed to be in the +// cluster +#define CLUSTER_BUMP_LENGTH 1 +#define CLUSTER_MEMBER_DELAY HRTIME_SECONDS(1) +// How long to leave an unconnected ClusterVConnection waiting +// Note: assumes (CLUSTER_CONNECT_TIMEOUT == 2 * CACHE_CLUSTER_TIMEOUT) #ifdef CLUSTER_TEST_DEBUG -#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(65536) +#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(65536) #else -#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(10) +#define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(10) #endif -#define CLUSTER_CONNECT_RETRY HRTIME_MSECONDS(20) -#define CLUSTER_RETRY HRTIME_MSECONDS(10) +#define CLUSTER_CONNECT_RETRY HRTIME_MSECONDS(20) +#define CLUSTER_RETRY HRTIME_MSECONDS(10) #define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10) - // Force close on cluster channel if no activity detected in this interval +// Force close on cluster channel if no activity detected in this interval #ifdef CLUSTER_TEST_DEBUG #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60)) #else #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60)) #endif - // Defines for work deferred to ET_NET threads -#define COMPLETION_CALLBACK_PERIOD HRTIME_MSECONDS(10) -#define MAX_COMPLETION_CALLBACK_EVENTS 16 +// Defines for work deferred to ET_NET threads +#define COMPLETION_CALLBACK_PERIOD HRTIME_MSECONDS(10) +#define MAX_COMPLETION_CALLBACK_EVENTS 16 - // ClusterHandler::mainClusterEvent() thread active state -#define CLUSTER_ACTIVE 1 -#define CLUSTER_NOT_ACTIVE 0 +// ClusterHandler::mainClusterEvent() thread active state +#define CLUSTER_ACTIVE 1 +#define CLUSTER_NOT_ACTIVE 0 - // defines for ClusterHandler::remote_closed -#define FORCE_CLOSE_ON_OPEN_CHANNEL -2 +// defines for ClusterHandler::remote_closed +#define FORCE_CLOSE_ON_OPEN_CHANNEL -2 - // defines for machine_config_change() -#define MACHINE_CONFIG 0 -#define CLUSTER_CONFIG 1 +// defines for machine_config_change() +#define MACHINE_CONFIG 0 +#define CLUSTER_CONFIG 1 // Debug interface category definitions -#define CL_NOTE "cluster_note" -#define CL_WARN "cluster_warn" -#define CL_PROTO "cluster_proto" -#define CL_TRACE "cluster_trace" +#define CL_NOTE "cluster_note" +#define CL_WARN "cluster_warn" +#define CL_PROTO "cluster_proto" +#define CL_TRACE "cluster_trace" /*************************************************************************/ // Constants /*************************************************************************/ -#define MAX_FAST_CONTROL_MESSAGE 504 // 512 - 4 (cluster func #) - 4 align -#define SMALL_CONTROL_MESSAGE MAX_FAST_CONTROL_MESSAGE // copied instead - // of vectored +#define MAX_FAST_CONTROL_MESSAGE 504 // 512 - 4 (cluster func #) - 4 align +#define SMALL_CONTROL_MESSAGE MAX_FAST_CONTROL_MESSAGE // copied instead + // of vectored #define WRITE_MESSAGE_ALREADY_BUILT -1 -#define MAGIC_COUNT(_x) \ -(0xBADBAD ^ ~(uint32_t)_x.msg.count \ - ^ ~(uint32_t)_x.msg.descriptor_cksum \ - ^ ~(uint32_t)_x.msg.control_bytes_cksum \ - ^ ~(uint32_t)_x.msg.unused \ - ^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number) +#define MAGIC_COUNT(_x) \ + (0xBADBAD ^ ~(uint32_t)_x.msg.count ^ ~(uint32_t)_x.msg.descriptor_cksum ^ ~(uint32_t)_x.msg.control_bytes_cksum ^ \ + ~(uint32_t)_x.msg.unused ^ ~((uint32_t)_x.msg.control_bytes << 16) ^ _x.sequence_number) -#define DOUBLE_ALIGN(_x) ((((uintptr_t)_x)+7)&~7) +#define DOUBLE_ALIGN(_x) ((((uintptr_t)_x) + 7) & ~7) /*************************************************************************/ // Testing Defines /*************************************************************************/ -#define MISS_TEST 0 -#define TEST_PARTIAL_WRITES 0 -#define TEST_PARTIAL_READS 0 -#define TEST_TIMING 0 -#define TEST_READ_LOCKS_MISSED 0 -#define TEST_WRITE_LOCKS_MISSED 0 -#define TEST_ENTER_EXIT 0 -#define TEST_ENTER_EXIT 0 +#define MISS_TEST 0 +#define TEST_PARTIAL_WRITES 0 +#define TEST_PARTIAL_READS 0 +#define TEST_TIMING 0 +#define TEST_READ_LOCKS_MISSED 0 +#define TEST_WRITE_LOCKS_MISSED 0 +#define TEST_ENTER_EXIT 0 +#define TEST_ENTER_EXIT 0 // // Timing testing // #if TEST_TIMING -#define TTTEST(_x) \ -fprintf(stderr, _x " at: %u\n", \ - ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000) -#define TTEST(_x) \ -fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \ - ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000) -#define TIMEOUT_TESTS(_s,_d) \ - if (*(int*)_d == 8) \ - fprintf(stderr,_s" lookup %d\n", *(int*)(_d+20)); \ - else if (*(int*)_d == 10) \ - fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \ - *(int*)(_d+40)); \ - else if (*(int*)_d == 11) \ - fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \ - *(int*)(_d+8)) +#define TTTEST(_x) fprintf(stderr, _x " at: %u\n", ((unsigned int)(ink_get_hrtime() / HRTIME_MSECOND)) % 1000) +#define TTEST(_x) fprintf(stderr, _x " for: %d at: %u\n", vc->channel, ((unsigned int)(ink_get_hrtime() / HRTIME_MSECOND)) % 1000) +#define TIMEOUT_TESTS(_s, _d) \ + if (*(int *)_d == 8) \ + fprintf(stderr, _s " lookup %d\n", *(int *)(_d + 20)); \ + else if (*(int *)_d == 10) \ + fprintf(stderr, _s " op %d %d\n", *(int *)(_d + 36), *(int *)(_d + 40)); \ + else if (*(int *)_d == 11) \ + fprintf(stderr, _s " rop %d %d\n", *(int *)(_d + 4), *(int *)(_d + 8)) #else #define TTTEST(_x) #define TTEST(_x) -#define TIMEOUT_TESTS(_x,_y) +#define TIMEOUT_TESTS(_x, _y) #endif #if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED) static unsigned int test_cluster_locks_missed = 0; -static -test_cluster_lock_might_fail() +static test_cluster_lock_might_fail() { return (!(rand_r(&test_cluster_locks_missed) % 13)); } @@ -194,67 +184,63 @@ test_cluster_lock_might_fail() #endif #if TEST_ENTER_EXIT -struct enter_exit_class -{ +struct enter_exit_class { int *outv; - enter_exit_class(int *in, int *out):outv(out) - { - (*in)++; - } - ~enter_exit_class() - { - (*outv)++; - } + enter_exit_class(int *in, int *out) : outv(out) { (*in)++; } + ~enter_exit_class() { (*outv)++; } }; -#define enter_exit(_x,_y) enter_exit_class a(_x,_y) +#define enter_exit(_x, _y) enter_exit_class a(_x, _y) #else -#define enter_exit(_x,_y) +#define enter_exit(_x, _y) #endif -#define DOT_SEPARATED(_x) \ -((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \ - ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3] +#define DOT_SEPARATED(_x) \ + ((unsigned char *)&(_x))[0], ((unsigned char *)&(_x))[1], ((unsigned char *)&(_x))[2], ((unsigned char *)&(_x))[3] // // RPC message for CLOSE_CHANNEL_CLUSTER_FUNCTION // -struct CloseMessage:public ClusterMessageHeader -{ +struct CloseMessage : public ClusterMessageHeader { uint32_t channel; int32_t status; int32_t lerrno; uint32_t sequence_number; - enum - { + enum { MIN_VERSION = 1, MAX_VERSION = 1, - CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION + CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION, }; - CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) -: ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) { + CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) + : ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) + { } //////////////////////////////////////////////////////////////////////////// - static int protoToVersion(int protoMajor) + static int + protoToVersion(int protoMajor) { - (void) protoMajor; + (void)protoMajor; return CLOSE_CHAN_MESSAGE_VERSION; } - static int sizeof_fixedlen_msg() + static int + sizeof_fixedlen_msg() { return sizeof(CloseMessage); } - void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) { + void + init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) + { _init(vers); } - inline void SwapBytes() + inline void + SwapBytes() { if (NeedByteSwap()) { ats_swap32(&channel); - ats_swap32((uint32_t *) & status); - ats_swap32((uint32_t *) & lerrno); + ats_swap32((uint32_t *)&status); + ats_swap32((uint32_t *)&lerrno); ats_swap32(&sequence_number); } } @@ -264,36 +250,36 @@ struct CloseMessage:public ClusterMessageHeader // // RPC message for MACHINE_LIST_CLUSTER_FUNCTION // -struct MachineListMessage:public ClusterMessageHeader -{ - uint32_t n_ip; // Valid entries in ip[] - uint32_t ip[CLUSTER_MAX_MACHINES]; // variable length data +struct MachineListMessage : public ClusterMessageHeader { + uint32_t n_ip; // Valid entries in ip[] + uint32_t ip[CLUSTER_MAX_MACHINES]; // variable length data - enum - { + enum { MIN_VERSION = 1, MAX_VERSION = 1, - MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION + MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION, }; - MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0) - { - memset(ip, 0, sizeof(ip)); - } + MachineListMessage() : ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0) { memset(ip, 0, sizeof(ip)); } //////////////////////////////////////////////////////////////////////////// - static int protoToVersion(int protoMajor) + static int + protoToVersion(int protoMajor) { - (void) protoMajor; + (void)protoMajor; return MACHINE_LIST_MESSAGE_VERSION; } - static int sizeof_fixedlen_msg() + static int + sizeof_fixedlen_msg() { return sizeof(ClusterMessageHeader); } - void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) { + void + init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) + { _init(vers); } - inline void SwapBytes() + inline void + SwapBytes() { ats_swap32(&n_ip); } @@ -303,39 +289,43 @@ struct MachineListMessage:public ClusterMessageHeader // // RPC message for SET_CHANNEL_DATA_CLUSTER_FUNCTION // -struct SetChanDataMessage:public ClusterMessageHeader -{ +struct SetChanDataMessage : public ClusterMessageHeader { uint32_t channel; uint32_t sequence_number; - uint32_t data_type; // enum CacheDataType + uint32_t data_type; // enum CacheDataType char data[4]; - enum - { + enum { MIN_VERSION = 1, MAX_VERSION = 1, - SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION + SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION, }; - SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) -: ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) { + SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) + : ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) + { memset(data, 0, sizeof(data)); } //////////////////////////////////////////////////////////////////////////// - static int protoToVersion(int protoMajor) + static int + protoToVersion(int protoMajor) { - (void) protoMajor; + (void)protoMajor; return SET_CHANNEL_DATA_MESSAGE_VERSION; } - static int sizeof_fixedlen_msg() + static int + sizeof_fixedlen_msg() { SetChanDataMessage *p = 0; - return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p)); + return (int)DOUBLE_ALIGN((int64_t)((char *)&p->data[0] - (char *)p)); } - void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) { + void + init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) + { _init(vers); } - inline void SwapBytes() + inline void + SwapBytes() { if (NeedByteSwap()) { ats_swap32(&channel); @@ -349,36 +339,40 @@ struct SetChanDataMessage:public ClusterMessageHeader // // RPC message for SET_CHANNEL_PIN_CLUSTER_FUNCTION // -struct SetChanPinMessage:public ClusterMessageHeader -{ +struct SetChanPinMessage : public ClusterMessageHeader { uint32_t channel; uint32_t sequence_number; uint32_t pin_time; - enum - { + enum { MIN_VERSION = 1, MAX_VERSION = 1, - SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION + SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION, }; - SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) -: ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) { + SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) + : ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) + { } //////////////////////////////////////////////////////////////////////////// - static int protoToVersion(int protoMajor) + static int + protoToVersion(int protoMajor) { - (void) protoMajor; + (void)protoMajor; return SET_CHANNEL_PIN_MESSAGE_VERSION; } - static int sizeof_fixedlen_msg() + static int + sizeof_fixedlen_msg() { - return (int) sizeof(SetChanPinMessage); + return (int)sizeof(SetChanPinMessage); } - void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) { + void + init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) + { _init(vers); } - inline void SwapBytes() + inline void + SwapBytes() { if (NeedByteSwap()) { ats_swap32(&channel); @@ -392,36 +386,40 @@ struct SetChanPinMessage:public ClusterMessageHeader // // RPC message for SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION // -struct SetChanPriorityMessage:public ClusterMessageHeader -{ +struct SetChanPriorityMessage : public ClusterMessageHeader { uint32_t channel; uint32_t sequence_number; uint32_t disk_priority; - enum - { + enum { MIN_VERSION = 1, MAX_VERSION = 1, - SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION + SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION, }; - SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) -: ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) { + SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) + : ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) + { } //////////////////////////////////////////////////////////////////////////// - static int protoToVersion(int protoMajor) + static int + protoToVersion(int protoMajor) { - (void) protoMajor; + (void)protoMajor; return SET_CHANNEL_PRIORITY_MESSAGE_VERSION; } - static int sizeof_fixedlen_msg() + static int + sizeof_fixedlen_msg() { - return (int) sizeof(SetChanPriorityMessage); + return (int)sizeof(SetChanPriorityMessage); } - void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) { + void + init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) + { _init(vers); } - inline void SwapBytes() + inline void + SwapBytes() { if (NeedByteSwap()) { ats_swap32(&channel); @@ -454,7 +452,7 @@ IsHighBitSet(int *val) // ClusterAccept -- Handle cluster connect events from peer // cluster nodes. ///////////////////////////////////////////////////////////////// -class ClusterAccept:public Continuation +class ClusterAccept : public Continuation { public: ClusterAccept(int *, int, int); @@ -463,9 +461,9 @@ public: int ClusterAcceptEvent(int, void *); int ClusterAcceptMachine(NetVConnection *); - ~ClusterAccept(); -private: + ~ClusterAccept(); +private: int *p_cluster_port; int socket_send_bufsize; int socket_recv_bufsize; @@ -476,13 +474,13 @@ private: // VC++ 5.0 special struct ClusterHandler; -typedef int (ClusterHandler::*ClusterContHandler) (int, void *); +typedef int (ClusterHandler::*ClusterContHandler)(int, void *); struct OutgoingControl; -typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *); +typedef int (OutgoingControl::*OutgoingCtrlHandler)(int, void *); struct ClusterVConnection; -typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *); +typedef int (ClusterVConnection::*ClusterVConnHandler)(int, void *); // Library declarations extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int); @@ -493,7 +491,7 @@ extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVC extern void cluster_reschedule_offset(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int); extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *); extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t); -#define CLUSTER_BUMP_NO_REMOVE -1 +#define CLUSTER_BUMP_NO_REMOVE -1 extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int); extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **); @@ -501,7 +499,7 @@ extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t); extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t); // ClusterVConnection declarations -extern void clusterVCAllocator_free(ClusterVConnection * vc); +extern void clusterVCAllocator_free(ClusterVConnection *vc); extern ClassAllocator<ClusterVConnection> clusterVCAllocator; extern ClassAllocator<ByteBankDescriptor> byteBankAllocator; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterLib.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterLib.h b/iocore/cluster/P_ClusterLib.h index eff5098..39e510b 100644 --- a/iocore/cluster/P_ClusterLib.h +++ b/iocore/cluster/P_ClusterLib.h @@ -53,14 +53,13 @@ extern int partial_writev(int, IOVec *, int, int); extern void dump_time_buckets(); struct GlobalClusterPeriodicEvent; -typedef int (GlobalClusterPeriodicEvent::*GClusterPEHandler) (int, void *); +typedef int (GlobalClusterPeriodicEvent::*GClusterPEHandler)(int, void *); -struct GlobalClusterPeriodicEvent:public Continuation -{ +struct GlobalClusterPeriodicEvent : public Continuation { GlobalClusterPeriodicEvent(); ~GlobalClusterPeriodicEvent(); void init(); - int calloutEvent(Event * e, void *data); + int calloutEvent(Event *e, void *data); // Private data Event *_thisCallout; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterLoadMonitor.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterLoadMonitor.h b/iocore/cluster/P_ClusterLoadMonitor.h index 89156bd..60452bc 100644 --- a/iocore/cluster/P_ClusterLoadMonitor.h +++ b/iocore/cluster/P_ClusterLoadMonitor.h @@ -35,7 +35,7 @@ //*************************************************************************** // ClusterLoadMonitor class -- Compute cluster interconnect load metric //*************************************************************************** -class ClusterLoadMonitor:public Continuation +class ClusterLoadMonitor : public Continuation { public: ///////////////////////////////////// @@ -52,28 +52,27 @@ public: static int cf_cluster_load_clear_duration; static int cf_cluster_load_exceed_duration; - struct cluster_load_ping_msg - { + struct cluster_load_ping_msg { int magicno; int version; int sequence_number; ink_hrtime send_time; ClusterLoadMonitor *monitor; - enum - { + enum { CL_MSG_MAGICNO = 0x12ABCDEF, - CL_MSG_VERSION = 1 + CL_MSG_VERSION = 1, }; - cluster_load_ping_msg(ClusterLoadMonitor * m = 0) - : magicno(CL_MSG_MAGICNO), version(CL_MSG_VERSION), sequence_number(0), send_time(0), monitor(m) { + cluster_load_ping_msg(ClusterLoadMonitor *m = 0) + : magicno(CL_MSG_MAGICNO), version(CL_MSG_VERSION), sequence_number(0), send_time(0), monitor(m) + { } }; static void cluster_load_ping_rethandler(ClusterHandler *, void *, int); public: - ClusterLoadMonitor(ClusterHandler * ch); + ClusterLoadMonitor(ClusterHandler *ch); void init(); ~ClusterLoadMonitor(); void cancel_monitor(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_ClusterMachine.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_ClusterMachine.h b/iocore/cluster/P_ClusterMachine.h index 4415d5d..c98c794 100644 --- a/iocore/cluster/P_ClusterMachine.h +++ b/iocore/cluster/P_ClusterMachine.h @@ -39,7 +39,7 @@ // Timeout the Machine * this amount of time after they // fall out of the current configuration that are deleted. // -#define MACHINE_TIMEOUT (HRTIME_DAY*2) +#define MACHINE_TIMEOUT (HRTIME_DAY * 2) // @@ -51,12 +51,11 @@ // // Long running operations should use more sophisticated synchronization. // -#define NO_RACE_DELAY HRTIME_HOUR // a long long time +#define NO_RACE_DELAY HRTIME_HOUR // a long long time -struct ClusterHandler; // Leave this a class - VC++ gets very anal ~SR -- which version of VC++? ~igalic +struct ClusterHandler; // Leave this a class - VC++ gets very anal ~SR -- which version of VC++? ~igalic -struct ClusterMachine: public Server -{ +struct ClusterMachine : public Server { bool dead; char *hostname; int hostname_len; @@ -87,17 +86,17 @@ struct ClusterMachine: public Server ClusterHandler **clusterHandlers; }; -struct MachineListElement -{ +struct MachineListElement { unsigned int ip; int port; }; -struct MachineList -{ +struct MachineList { int n; MachineListElement machine[1]; - MachineListElement *find(unsigned int ip, int port = 0) { + MachineListElement * + find(unsigned int ip, int port = 0) + { for (int i = 0; i < n; i++) if (machine[i].ip == ip && (!port || machine[i].port == port)) return &machine[i]; @@ -106,20 +105,20 @@ struct MachineList }; MachineList *read_MachineList(const char *filename, int test_fd = -1); -void free_MachineList(MachineList * l); +void free_MachineList(MachineList *l); -struct clusterConfigFile -{ - char *parseFile(int fd) +struct clusterConfigFile { + char * + parseFile(int fd) { - return (char *) read_MachineList(NULL, fd); + return (char *)read_MachineList(NULL, fd); } }; inkcoreapi ClusterMachine *this_cluster_machine(); void create_this_cluster_machine(); -void free_ClusterMachine(ClusterMachine * m); +void free_ClusterMachine(ClusterMachine *m); MachineList *the_cluster_machines_config(); MachineList *the_cluster_config(); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/P_TimeTrace.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/P_TimeTrace.h b/iocore/cluster/P_TimeTrace.h index e4f3796..e92a7f8 100644 --- a/iocore/cluster/P_TimeTrace.h +++ b/iocore/cluster/P_TimeTrace.h @@ -32,8 +32,8 @@ // #define ENABLE_TIME_TRACE -#define TIME_DIST_BUCKETS 500 -#define TIME_DIST_BUCKETS_SIZE TIME_DIST_BUCKETS+1 +#define TIME_DIST_BUCKETS 500 +#define TIME_DIST_BUCKETS_SIZE TIME_DIST_BUCKETS + 1 #ifdef ENABLE_TIME_TRACE extern int cdb_callback_time_dist[TIME_DIST_BUCKETS_SIZE]; @@ -65,14 +65,15 @@ extern int cluster_send_events; #endif // ENABLE_TIME_TRACE #ifdef ENABLE_TIME_TRACE -#define LOG_EVENT_TIME(_start_time, _time_dist, _time_cnt) do { \ - ink_hrtime now = ink_get_hrtime(); \ - unsigned int bucket = (now - _start_time) / HRTIME_MSECONDS(10); \ - if (bucket > TIME_DIST_BUCKETS) \ - bucket = TIME_DIST_BUCKETS; \ - ink_atomic_increment(&_time_dist[bucket], 1); \ - ink_atomic_increment(&_time_cnt, 1); \ -} while(0) +#define LOG_EVENT_TIME(_start_time, _time_dist, _time_cnt) \ + do { \ + ink_hrtime now = ink_get_hrtime(); \ + unsigned int bucket = (now - _start_time) / HRTIME_MSECONDS(10); \ + if (bucket > TIME_DIST_BUCKETS) \ + bucket = TIME_DIST_BUCKETS; \ + ink_atomic_increment(&_time_dist[bucket], 1); \ + ink_atomic_increment(&_time_cnt, 1); \ + } while (0) #else // !ENABLE_TIME_TRACE #define LOG_EVENT_TIME(_start_time, _time_dist, _time_cnt) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/test_I_Cluster.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/test_I_Cluster.cc b/iocore/cluster/test_I_Cluster.cc index 587834f..fb10885 100644 --- a/iocore/cluster/test_I_Cluster.cc +++ b/iocore/cluster/test_I_Cluster.cc @@ -62,7 +62,6 @@ reconfigure_diags() // read output routing values for (i = 0; i < DiagsLevel_Count; i++) { - c.outputs[i].to_stdout = 0; c.outputs[i].to_stderr = 1; c.outputs[i].to_syslog = 1; @@ -85,20 +84,18 @@ reconfigure_diags() if (diags->base_action_tags) diags->activate_taglist(diags->base_action_tags, DiagsTagType_Action); - //////////////////////////////////// - // change the diags config values // - //////////////////////////////////// - // XXX: HP-UX ??? +//////////////////////////////////// +// change the diags config values // +//////////////////////////////////// +// XXX: HP-UX ??? #if !defined(__GNUC__) && !defined(hpux) diags->config = c; #else - memcpy(((void *) &diags->config), ((void *) &c), sizeof(DiagsConfigState)); + memcpy(((void *)&diags->config), ((void *)&c), sizeof(DiagsConfigState)); #endif - } - static void init_diags(char *bdt, char *bat) { @@ -121,13 +118,13 @@ init_diags(char *bdt, char *bat) if (diags_log_fp == NULL) { SrcLoc loc(__FILE__, __FUNCTION__, __LINE__); - diags->print(NULL, DL_Warning, NULL, &loc, - "couldn't open diags log file '%s', " "will not log to this file", diags_logpath); + diags->print(NULL, DL_Warning, NULL, &loc, "couldn't open diags log file '%s', " + "will not log to this file", + diags_logpath); } diags->print(NULL, DL_Status, "STATUS", NULL, "opened %s", diags_logpath); reconfigure_diags(); - } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/test_P_Cluster.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/test_P_Cluster.cc b/iocore/cluster/test_P_Cluster.cc index 587834f..fb10885 100644 --- a/iocore/cluster/test_P_Cluster.cc +++ b/iocore/cluster/test_P_Cluster.cc @@ -62,7 +62,6 @@ reconfigure_diags() // read output routing values for (i = 0; i < DiagsLevel_Count; i++) { - c.outputs[i].to_stdout = 0; c.outputs[i].to_stderr = 1; c.outputs[i].to_syslog = 1; @@ -85,20 +84,18 @@ reconfigure_diags() if (diags->base_action_tags) diags->activate_taglist(diags->base_action_tags, DiagsTagType_Action); - //////////////////////////////////// - // change the diags config values // - //////////////////////////////////// - // XXX: HP-UX ??? +//////////////////////////////////// +// change the diags config values // +//////////////////////////////////// +// XXX: HP-UX ??? #if !defined(__GNUC__) && !defined(hpux) diags->config = c; #else - memcpy(((void *) &diags->config), ((void *) &c), sizeof(DiagsConfigState)); + memcpy(((void *)&diags->config), ((void *)&c), sizeof(DiagsConfigState)); #endif - } - static void init_diags(char *bdt, char *bat) { @@ -121,13 +118,13 @@ init_diags(char *bdt, char *bat) if (diags_log_fp == NULL) { SrcLoc loc(__FILE__, __FUNCTION__, __LINE__); - diags->print(NULL, DL_Warning, NULL, &loc, - "couldn't open diags log file '%s', " "will not log to this file", diags_logpath); + diags->print(NULL, DL_Warning, NULL, &loc, "couldn't open diags log file '%s', " + "will not log to this file", + diags_logpath); } diags->print(NULL, DL_Status, "STATUS", NULL, "opened %s", diags_logpath); reconfigure_diags(); - }
