Updated Branches: refs/heads/refine_cluster 7ffc10a9c -> 62504a9f8 (forced update)
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/session.cc ---------------------------------------------------------------------- diff --git a/iocore/cluster/session.cc b/iocore/cluster/session.cc new file mode 100644 index 0000000..7adead6 --- /dev/null +++ b/iocore/cluster/session.cc @@ -0,0 +1,1267 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/stat.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/epoll.h> +#include "Diags.h" +#include "machine.h" +#include "global.h" +#include "connection.h" +#include "clusterinterface.h" +#include "nio.h" +#ifndef TS_INLINE +#define TS_INLINE inline +#endif +#include "I_IOBuffer.h" +#include "P_Cluster.h" +#include "P_RecCore.h" +#include "session.h" + +#ifndef USE_MULTI_ALLOCATOR +static Allocator in_message_allocator("InMessage", sizeof(InMessage), 1024); +#endif + +static Allocator session_allocator("SessionEntry", sizeof(SessionEntry), 1024); + +static MachineSessions *all_sessions; //[src ip % MAX_MACHINE_COUNT] +static ink_mutex session_lock; +static int my_machine_id = 0; + +struct SessionRecords { + RecRecord * create_total_count; //create session total count + RecRecord * create_success_count; //create session success count + RecRecord * create_retry_times; //create session retry times + RecRecord * close_total_count; //close session count + RecRecord * close_success_count; //close session success count + RecRecord * session_miss_count; //session miss count + RecRecord * session_occupied_count; //session occupied count +}; + +static SessionRecords server_session_records = {NULL, NULL, NULL, NULL, NULL, NULL, NULL}; +static SessionRecords client_session_records = {NULL, NULL, NULL, NULL, NULL, NULL, NULL}; + +static void init_session_stat(SessionRecords *pSessionRecords, const char *prefix); + +inline static int get_session_machine_index(const unsigned int ip) +{ + int id; + int count; + int index; + + id = ip % MAX_MACHINE_COUNT; + if (all_sessions[id].ip == ip) { + return id; + } + + count = 1; + while (count <= MAX_MACHINE_COUNT) { + index = (id + count) % MAX_MACHINE_COUNT; + if (all_sessions[index].ip == ip) { + return index; + } + count++; + } + + return -1; +} + +static int alloc_session_machine_index(const unsigned int ip) +{ + int id; + int count; + int index; + + id = ip % MAX_MACHINE_COUNT; + if (all_sessions[id].ip == 0) { + return id; + } + + count = 1; + while (count <= MAX_MACHINE_COUNT) { + index = (id + count) % MAX_MACHINE_COUNT; + if (all_sessions[index].ip == 0) { + return index; + } + count++; + } + + return -1; +} + +inline static void release_in_message(SocketContext *pSockContext, + InMessage *pMessage) +{ + ink_atomic_increment(&pSockContext->thread_context->stats. + dequeue_in_msg_count, 1); + ink_atomic_increment(&pSockContext->thread_context->stats. + dequeue_in_msg_bytes, MSG_HEADER_LENGTH + pMessage->data_len); + + pMessage->blocks = NULL; //free pointer +#ifdef USE_MULTI_ALLOCATOR + pSockContext->in_msg_allocator->free_void(pMessage); +#else + (void)pSockContext; + in_message_allocator.free_void(pMessage); +#endif +} + +int init_machine_sessions(ClusterMachine *machine, const bool bMyself) +{ + int result; + int sessions_bytes; + int locks_bytes; + int machine_id; + MachineSessions *pMachineSessions; + ink_mutex *pLock; + ink_mutex *pLockEnd; + + ink_mutex_acquire(&session_lock); + if ((machine_id=get_session_machine_index(machine->ip)) < 0) { + if ((machine_id=alloc_session_machine_index(machine->ip)) < 0) { + ink_mutex_release(&session_lock); + return ENOSPC; + } + } + + pMachineSessions = all_sessions + machine_id; + if (pMachineSessions->init_done) { //already init + ink_mutex_release(&session_lock); + return 0; + } + + pMachineSessions->is_myself = bMyself; + pMachineSessions->ip = machine->ip; + + sessions_bytes = sizeof(SessionEntry) * max_session_count_per_machine; + pMachineSessions->sessions = (SessionEntry *)malloc(sessions_bytes); + if (pMachineSessions->sessions == NULL) { + Error("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, sessions_bytes, errno, strerror(errno)); + ink_mutex_release(&session_lock); + return errno != 0 ? errno : ENOMEM; + } + memset(pMachineSessions->sessions, 0, sessions_bytes); + + locks_bytes = sizeof(ink_mutex) * session_lock_count_per_machine; + pMachineSessions->locks = (ink_mutex *)malloc(locks_bytes); + if (pMachineSessions->locks == NULL) { + Error("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, locks_bytes, errno, strerror(errno)); + ink_mutex_release(&session_lock); + return errno != 0 ? errno : ENOMEM; + } + + pLockEnd = pMachineSessions->locks + session_lock_count_per_machine; + for (pLock=pMachineSessions->locks; pLock<pLockEnd; pLock++) { + if ((result=ink_mutex_init(pLock, "session_locks")) != 0) { + ink_mutex_release(&session_lock); + return result; + } + } + + pMachineSessions->init_done = true; + ink_mutex_release(&session_lock); + return 0; +} + +int session_init() +{ + int bytes; + int result; + ClusterMachine *myMachine; + + bytes = sizeof(MachineSessions) * MAX_MACHINE_COUNT; + all_sessions = (MachineSessions *)malloc(bytes); + if (all_sessions == NULL) { + Error("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, bytes, errno, strerror(errno)); + return errno != 0 ? errno : ENOMEM; + } + memset(all_sessions, 0, bytes); + + myMachine = cluster_machines + 0; + if ((result=init_machine_sessions(myMachine, true)) != 0) { + return result; + } + + if ((result=ink_mutex_init(&session_lock, "session_lock")) != 0) { + return result; + } + + my_machine_id = get_session_machine_index(myMachine->ip); + Debug(CLUSTER_DEBUG_TAG, "my_machine_id: %d", my_machine_id); + + init_session_stat(&server_session_records, "proxy.process.cluster.server_session"); + init_session_stat(&client_session_records, "proxy.process.cluster.client_session"); + + return 0; +} + +int cluster_create_session(ClusterSession *session, + const ClusterMachine *machine, void *arg, const int events) +{ + MachineSessions *pMachineSessions; + SessionEntry *pSessionEntry; + SocketContext *pSockContext; + int i; + int session_index; + int version; + SequenceType seq; + + pMachineSessions = all_sessions + my_machine_id; + + ink_atomic_increment(&pMachineSessions->session_stat.create_total_count, 1); + + if ((pSockContext=get_socket_context(machine)) == NULL) { + return ENOENT; + } + version = pSockContext->version; + + for (i=0; i<128; i++) { + seq = ink_atomic_increment(&pMachineSessions->current_seq, 1); + session_index = seq % max_session_count_per_machine; + pSessionEntry = pMachineSessions->sessions + session_index; + if (IS_SESSION_EMPTY(pSessionEntry->session_id)) { + SESSION_LOCK(pMachineSessions, session_index); + if (IS_SESSION_EMPTY(pSessionEntry->session_id)) { + pSessionEntry->session_id.fields.ip = my_machine_ip; + pSessionEntry->session_id.fields.timestamp = CURRENT_TIME(); + pSessionEntry->session_id.fields.seq = seq; + pSessionEntry->sock_context = pSockContext; + pSessionEntry->user_data = arg; + pSessionEntry->response_events = events; + pSessionEntry->current_msg_seq = 0; + pSessionEntry->version = version; + + *session = pSessionEntry->session_id; + +#ifdef TRIGGER_STAT_FLAG + if (pSessionEntry->response_events & RESPONSE_EVENT_NOTIFY_DEALER) { + pSessionEntry->stat_start_time = CURRENT_NS(); + } +#endif + SESSION_UNLOCK(pMachineSessions, session_index); + + ink_atomic_increment(&pMachineSessions->session_stat. + create_success_count, 1); + ink_atomic_increment(&pMachineSessions->session_stat. + create_retry_times, i + 1); + return 0; + } + SESSION_UNLOCK(pMachineSessions, session_index); + } + } + + ink_atomic_increment(&pMachineSessions->session_stat. + create_retry_times, i); + + return ENOSPC; +} + +#define GET_MACHINE_INDEX(machine_id, ip, pMachineSessions, return_value) \ + do { \ + if ((machine_id=get_session_machine_index(ip)) < 0) { \ + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " \ + "ip: %u not exist!", __LINE__, ip); \ + return return_value; \ + } \ + pMachineSessions = all_sessions + machine_id; \ + if (!(pMachineSessions)->init_done) { \ + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " \ + "ip: %u not init!", __LINE__, ip); \ + return return_value; \ + } \ + } while (0) + + +inline static SessionEntry *get_session( + const ClusterSession *session_id, SessionEntry *pSession) +{ + SessionEntry *pCurrent; + pCurrent = pSession; + do { + if (IS_SESSION_EQUAL(pCurrent->session_id, *session_id)) { + return pCurrent; + } + + pCurrent = pCurrent->next; + } while (pCurrent != NULL); + + return NULL; +} + +int cluster_bind_session(ClusterSession session, void *arg) +{ + SessionEntry *pSessionEntry; + MachineSessions *pMachineSessions; + int result; + int machine_id; + int session_index; + + GET_MACHINE_INDEX(machine_id, session.fields.ip, pMachineSessions, ENOENT); + + session_index = session.fields.seq % max_session_count_per_machine; + pSessionEntry = pMachineSessions->sessions + session_index; + SESSION_LOCK(pMachineSessions, session_index); + if ((pSessionEntry=get_session(&session, pSessionEntry)) != NULL) { + pSessionEntry->user_data = arg; + result = 0; + } + else { + result = ENOENT; + } + SESSION_UNLOCK(pMachineSessions, session_index); + return result; +} + +int cluster_set_events(ClusterSession session, const int events) +{ + SessionEntry *pSessionEntry; + MachineSessions *pMachineSessions; + SocketContext *pSockContext; + InMessage *pMessage; + void *user_data; + int result; + int machine_id; + int session_index; + + GET_MACHINE_INDEX(machine_id, session.fields.ip, pMachineSessions, ENOENT); + + session_index = session.fields.seq % max_session_count_per_machine; + pSessionEntry = pMachineSessions->sessions + session_index; + SESSION_LOCK(pMachineSessions, session_index); + + if ((pSessionEntry=get_session(&session, pSessionEntry)) != NULL) { + pSockContext = pSessionEntry->sock_context; + if (pSockContext != NULL) { + if (events & RESPONSE_EVENT_NOTIFY_DEALER) { + + //assert((pSessionEntry->response_events & RESPONSE_EVENT_NOTIFY_DEALER) == 0); + +#ifdef TRIGGER_STAT_FLAG + //for stat + if (pMachineSessions->is_myself) { //client + pSessionEntry->stat_start_time = CURRENT_NS(); + } + else { //server + if (pSessionEntry->stat_start_time != 0) { + ink_atomic_increment(&pMachineSessions->trigger_stat.count, 1); + ink_atomic_increment(&pMachineSessions->trigger_stat.time_used, + CURRENT_NS() - pSessionEntry->stat_start_time); + pSessionEntry->stat_start_time = 0; + } + } +#endif + + pMessage = pSessionEntry->messages; + if (pMessage == NULL) { + pSessionEntry->response_events = events; //waiting for message to notify + } + else { + pSessionEntry->messages = pSessionEntry->messages->next; //consume one + } + } + else { + pMessage = NULL; + pSessionEntry->response_events = events; + } + + user_data = pSessionEntry->user_data; + result = 0; + } + else { + pMessage = NULL; + user_data = NULL; + result = ENOENT; + } + } + else { + pSockContext = NULL; + pMessage = NULL; + user_data = NULL; + result = ENOENT; + } + +#ifdef TRIGGER_STAT_FLAG + if (pMessage != NULL) { + if (!pMachineSessions->is_myself) { //server + pSessionEntry->stat_start_time = CURRENT_NS(); + } + } +#endif + SESSION_UNLOCK(pMachineSessions, session_index); + + if (pMessage != NULL) { + cluster_msg_deal_func(session, user_data, pMessage->func_id, + pMessage->blocks, pMessage->data_len); + release_in_message(pSockContext, pMessage); + } + + return result; +} + +void *cluster_close_session(ClusterSession session) +{ + void *old_data; + SessionEntry *previous; + SessionEntry *pSessionEntry; + MachineSessions *pMachineSessions; + InMessage *pMessage; + int machine_id; + int session_index; + + GET_MACHINE_INDEX(machine_id, session.fields.ip, pMachineSessions, NULL); + + ink_atomic_increment(&pMachineSessions->session_stat.close_total_count, 1); + + session_index = session.fields.seq % max_session_count_per_machine; + pSessionEntry = pMachineSessions->sessions + session_index; + SESSION_LOCK(pMachineSessions, session_index); + + previous = NULL; + do { + if (pSessionEntry->sock_context != NULL && IS_SESSION_EQUAL( + session, pSessionEntry->session_id)) + { + break; + } + + previous = pSessionEntry; + pSessionEntry = pSessionEntry->next; + } while (pSessionEntry != NULL); + + if (pSessionEntry != NULL) { //found + old_data = pSessionEntry->user_data; + while (pSessionEntry->messages != NULL) { + pMessage = pSessionEntry->messages; + pSessionEntry->messages = pSessionEntry->messages->next; + + release_in_message(pSessionEntry->sock_context, pMessage); + } + pSessionEntry->sock_context = NULL; + pSessionEntry->response_events = 0; + pSessionEntry->user_data = NULL; + CLEAR_SESSION(pSessionEntry->session_id); + +#ifdef TRIGGER_STAT_FLAG + if (pSessionEntry->stat_start_time != 0) { + ink_atomic_increment(&pMachineSessions->trigger_stat.count, 1); + ink_atomic_increment(&pMachineSessions->trigger_stat.time_used, + CURRENT_NS() - pSessionEntry->stat_start_time); + pSessionEntry->stat_start_time = 0; + } +#endif + + ink_atomic_increment(&pMachineSessions->session_stat. + close_success_count, 1); + +#ifdef MSG_TIME_STAT_FLAG + if (pMachineSessions->is_myself) + {//request by me + if (pSessionEntry->client_start_time != 0) { + ink_atomic_increment(&pMachineSessions->msg_stat.count, 1); + ink_atomic_increment(&pMachineSessions->msg_stat.time_used, + CURRENT_NS() - pSessionEntry->client_start_time); + pSessionEntry->client_start_time = 0; + } + } + else { //request by other + if (pSessionEntry->server_start_time != 0) { + ink_atomic_increment(&pMachineSessions->msg_stat.count, 1); + ink_atomic_increment(&pMachineSessions->msg_stat.time_used, + CURRENT_NS() - pSessionEntry->server_start_time); + pSessionEntry->server_start_time = 0; + } + } + + if (pSessionEntry->send_start_time != 0) { + ink_atomic_increment(&pMachineSessions->msg_send.count, 1); + ink_atomic_increment(&pMachineSessions->msg_send.time_used, + CURRENT_NS() - pSessionEntry->send_start_time); + pSessionEntry->send_start_time = 0; + } +#endif + + if (previous == NULL) { //remove the head session + SessionEntry *pNextSession; + pNextSession = pSessionEntry->next; + if (pNextSession != NULL) { + memcpy(pSessionEntry, pNextSession, sizeof(SessionEntry)); + session_allocator.free_void(pNextSession); + } + } + else { + previous->next = pSessionEntry->next; + session_allocator.free_void(pSessionEntry); + } + } + else { + old_data = NULL; + } + SESSION_UNLOCK(pMachineSessions, session_index); + return old_data; +} + +int get_session_for_send(const SessionId *session, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry) +{ + int machine_id; + int session_index; + int result; + + GET_MACHINE_INDEX(machine_id, session->fields.ip, *ppMachineSessions, ENOENT); + + session_index = session->fields.seq % max_session_count_per_machine; + *sessionEntry = (*ppMachineSessions)->sessions + session_index; + SESSION_LOCK(*ppMachineSessions, session_index); + + if ((*sessionEntry=get_session(session, *sessionEntry)) == NULL) { + result = ENOENT; + } + else if ((*sessionEntry)->messages != NULL) { //you must consume the recv messages firstly + *sessionEntry = NULL; + result = EBUSY; + } + else { + result = 0; + } + + SESSION_UNLOCK(*ppMachineSessions, session_index); + return result; +} + +#ifdef MSG_TIME_STAT_FLAG +int get_response_session_internal(const MsgHeader *pHeader, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry) +{ + SessionEntry *pSession; + SessionEntry *pCurrent; + int result; + int machine_id; + int session_index; + + GET_MACHINE_INDEX(machine_id, pHeader->session_id.fields.ip, + *ppMachineSessions, ENOENT); + + session_index = pHeader->session_id.fields.seq % max_session_count_per_machine; + pSession = (*ppMachineSessions)->sessions + session_index; + SESSION_LOCK(*ppMachineSessions, session_index); + pCurrent = pSession; + do { + if (IS_SESSION_EQUAL(pCurrent->session_id, pHeader->session_id)) { + *sessionEntry = pCurrent; + result = 0; + break; + } + + pCurrent = pCurrent->next; + } while (pCurrent != NULL); + + if (pCurrent == NULL) { + if ((*ppMachineSessions)->is_myself) { //request by me + *sessionEntry = NULL; + result = ENOENT; + } + else { + if (IS_SESSION_EMPTY(pSession->session_id)) { + if (pHeader->msg_seq == 1) { //first time, should create + *sessionEntry = pSession; + result = 0; + } + else { + *sessionEntry = NULL; + result = ENOENT; + } + } + else { + *sessionEntry = NULL; + result = EEXIST; + } + } + } + + SESSION_UNLOCK(*ppMachineSessions, session_index); + return result; +} +#endif + +int get_response_session(const MsgHeader *pHeader, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry, + SocketContext *pSocketContext, bool *call_func, void **user_data) +{ + SessionEntry *pSession; + SessionEntry *pTail; + SessionEntry *pCurrent; + int result; + int machine_id; + int session_index; + int chain_count; + + GET_MACHINE_INDEX(machine_id, pHeader->session_id.fields.ip, + *ppMachineSessions, ENOENT); + + session_index = pHeader->session_id.fields.seq % max_session_count_per_machine; + pSession = (*ppMachineSessions)->sessions + session_index; + SESSION_LOCK(*ppMachineSessions, session_index); + do { + pCurrent = pSession; + do { + if (IS_SESSION_EQUAL(pCurrent->session_id, pHeader->session_id)) { + *sessionEntry = pCurrent; + *user_data = pCurrent->user_data; + result = 0; + + if (pCurrent->response_events & RESPONSE_EVENT_NOTIFY_DEALER) { + pCurrent->response_events = 0; + *call_func = true; + } + else { + *call_func = false; + } + + break; + } + + pCurrent = pCurrent->next; + } while (pCurrent != NULL); + + if (pCurrent != NULL) { //found + pSession = pCurrent; + break; + } + + if ((*ppMachineSessions)->is_myself) { //request by me + if (IS_SESSION_EMPTY(pSession->session_id)) { + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "client sessionEntry: %16lX:%lX not exist, func_id: %d", + __LINE__, pHeader->session_id.ids[0], + pHeader->session_id.ids[1], pHeader->func_id); + *sessionEntry = NULL; + *call_func = false; + *user_data = NULL; + result = ENOENT; + + ink_atomic_increment(&(*ppMachineSessions)->session_stat. + session_miss_count, 1); + break; + } + } + else { //request by other + if (pHeader->msg_seq > 1) { //should discard the message + *sessionEntry = NULL; + *user_data = NULL; + *call_func = false; + result = ENOENT; + + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "server sessionEntry: %08X:%u:%"PRId64" not exist, msg seq: %u, " + "func_id: %d, data_len: %d", + __LINE__, pHeader->session_id.fields.ip, + pHeader->session_id.fields.timestamp, + pHeader->session_id.ids[1], pHeader->msg_seq, + pHeader->func_id, pHeader->data_len); + + ink_atomic_increment(&(*ppMachineSessions)->session_stat. + session_miss_count, 1); + break; + } + + if (IS_SESSION_EMPTY(pSession->session_id)) { + pTail = NULL; + chain_count = 0; + } + else { + chain_count = 1; + pTail = pSession; + if (pSession->next != NULL) { + ++chain_count; + pTail = pSession->next; + pSession = pTail->next; + while (pSession != NULL) { + pTail = pSession; + pSession = pSession->next; + ++chain_count; + } + } + + pSession = (SessionEntry *)session_allocator.alloc_void(); + pSession->messages = NULL; + pSession->user_data = NULL; + pSession->next = NULL; + +#ifdef TRIGGER_STAT_FLAG + pSession->stat_start_time = 0; +#endif +#ifdef MSG_TIME_STAT_FLAG + pSession->client_start_time = 0; + pSession->server_start_time = 0; + pSession->send_start_time = 0; +#endif + } + + //first time, should create + pSession->session_id = pHeader->session_id; //set sessionEntry id + pSession->sock_context = pSocketContext; + pSession->version = pSocketContext->version; + pSession->response_events = 0; + pSession->current_msg_seq = 0; + if (pTail != NULL) { + pTail->next = pSession; + + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "sessionEntry: %08X:%u:%"PRId64", chain count: %d", + __LINE__, pHeader->session_id.fields.ip, + pHeader->session_id.fields.timestamp, + pHeader->session_id.ids[1], chain_count + 1); + } + + *sessionEntry = pSession; + *user_data = NULL; + *call_func = true; + result = 0; + + ink_atomic_increment(&(*ppMachineSessions)->session_stat. + create_total_count, 1); + break; + } + + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "sessionEntry: %08X:%u:%"PRId64", position occupied by %08X:%u:%"PRId64", " + "quest by me: %d, time distance: %u, func_id: %d", + __LINE__, pHeader->session_id.fields.ip, + pHeader->session_id.fields.timestamp, pHeader->session_id.ids[1], + pSession->session_id.fields.ip, pSession->session_id.fields.timestamp, + pSession->session_id.ids[1], machine_id == my_machine_id, + pHeader->session_id.fields.timestamp - + pSession->session_id.fields.timestamp, pHeader->func_id); + *sessionEntry = NULL; + *user_data = NULL; + *call_func = false; + result = EEXIST; + + ink_atomic_increment(&(*ppMachineSessions)->session_stat. + session_occupied_count, 1); + } while (0); + +#ifdef TRIGGER_STAT_FLAG + if (*call_func) { + //stat + if ((*ppMachineSessions)->is_myself) { //request by me + if (pSession->stat_start_time != 0) { + ink_atomic_increment(&(*ppMachineSessions)->trigger_stat.count, 1); + ink_atomic_increment(&(*ppMachineSessions)->trigger_stat.time_used, + CURRENT_NS() - pSession->stat_start_time); + pSession->stat_start_time = 0; + } + } + else { + pSession->stat_start_time = CURRENT_NS(); + } + } +#endif + + SESSION_UNLOCK(*ppMachineSessions, session_index); + return result; +} + +static int do_notify_connection_closed(const int src_machine_id, + SocketContext *pSockContext) +{ + int count; + int session_index; + SessionEntry *pcurrent; + SessionEntry *pSessionEntry; + SessionEntry *pSessionEnd; + void *user_data; + bool call_func; + SessionId session_id; + + count = 0; + pSessionEnd = all_sessions[src_machine_id].sessions + + max_session_count_per_machine; + for (pSessionEntry=all_sessions[src_machine_id].sessions; + pSessionEntry<pSessionEnd; pSessionEntry++) + { + pcurrent = pSessionEntry; + do { + if (pcurrent->sock_context == pSockContext) { + session_index = pSessionEntry - all_sessions[src_machine_id].sessions; + SESSION_LOCK(all_sessions + src_machine_id, session_index); + call_func = (pcurrent->response_events & + RESPONSE_EVENT_NOTIFY_DEALER) && (pcurrent->messages == NULL); + session_id = pcurrent->session_id; + user_data = pcurrent->user_data; + SESSION_UNLOCK(all_sessions + src_machine_id, session_index); + + if (call_func) { + cluster_msg_deal_func(session_id, user_data, + FUNC_ID_CONNECTION_CLOSED_NOTIFY, NULL, 0); + } + else { + push_in_message(session_id, all_sessions + src_machine_id, + pcurrent, FUNC_ID_CONNECTION_CLOSED_NOTIFY, NULL, 0); + } + + count++; + } + + pcurrent = pcurrent->next; + } while (pcurrent != NULL); + } + + return count; +} + +int notify_connection_closed(SocketContext *pSockContext) +{ + int count1; + int count2; + int machine_id; + + count1 = do_notify_connection_closed(my_machine_id, pSockContext); + if (count1 > 0) { + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "notify my session close count: %d", __LINE__, count1); + } + + machine_id = get_session_machine_index(pSockContext->machine->ip); + if (machine_id >= 0 && all_sessions[machine_id].init_done) { + count2 = do_notify_connection_closed(machine_id, pSockContext); + if (count2 > 0) { + Debug(CLUSTER_DEBUG_TAG, "file: "__FILE__", line: %d, " + "notify %s session close count: %d", __LINE__, + pSockContext->machine->hostname, count2); + } + } + else { + count2 = 0; + } + + return count1 + count2; +} + +int push_in_message(const SessionId session, + MachineSessions *pMachineSessions, SessionEntry *pSessionEntry, + const int func_id, IOBufferBlock *blocks, const int data_len) +{ + SocketContext *pSockContext; + InMessage *pMessage; + void *user_data; + int session_index; + bool call_func; + + session_index = session.fields.seq % max_session_count_per_machine; + SESSION_LOCK(pMachineSessions, session_index); + pSockContext = pSessionEntry->sock_context; + if (!(pSockContext != NULL && IS_SESSION_EQUAL(pSessionEntry->session_id, + session))) + { + SESSION_UNLOCK(pMachineSessions, session_index); + return ENOENT; + } + +#ifdef USE_MULTI_ALLOCATOR + pMessage = (InMessage *)pSockContext->in_msg_allocator->alloc_void(); +#else + pMessage = (InMessage *)in_message_allocator.alloc_void(); +#endif + + if (pMessage == NULL) { + Error("file: "__FILE__", line: %d, " + "malloc %d bytes fail, errno: %d, error info: %s", + __LINE__, (int)sizeof(InMessage), errno, strerror(errno)); + SESSION_UNLOCK(pMachineSessions, session_index); + return errno != 0 ? errno : ENOMEM; + } + + pMessage->blocks.m_ptr = NULL; //must set to NULL before set value + pMessage->func_id = func_id; + pMessage->blocks = blocks; + pMessage->data_len = data_len; + pMessage->next = NULL; + + if (pSessionEntry->messages == NULL) { + pSessionEntry->messages = pMessage; + } + else if (pSessionEntry->messages->next == NULL) { + pSessionEntry->messages->next = pMessage; + } + else { + InMessage *pTail; + pTail = pSessionEntry->messages->next; + while (pTail->next != NULL) { + pTail = pTail->next; + } + pTail->next = pMessage; + } + + //check if notify dealer + if (pSessionEntry->response_events & RESPONSE_EVENT_NOTIFY_DEALER) { + pSessionEntry->response_events = 0; + pMessage = pSessionEntry->messages; + pSessionEntry->messages = pSessionEntry->messages->next; //consume one + user_data = pSessionEntry->user_data; + call_func = true; + } + else { + user_data = NULL; + call_func = false; + } + +#ifdef TRIGGER_STAT_FLAG + if (call_func) { + if (!pMachineSessions->is_myself) { //server + pSessionEntry->stat_start_time = CURRENT_NS(); + } + } +#endif + SESSION_UNLOCK(pMachineSessions, session_index); + + pSockContext->thread_context->stats.enqueue_in_msg_count++; + pSockContext->thread_context->stats.enqueue_in_msg_bytes += + MSG_HEADER_LENGTH + data_len; + + if (call_func) { + cluster_msg_deal_func(session, user_data, pMessage->func_id, + pMessage->blocks, pMessage->data_len); + + release_in_message(pSockContext, pMessage); + } + + return 0; +} + +static void set_session_stat(SessionRecords *pSessionRecords, + const SessionStat *pNewtat, SessionStat *pOldStat) +{ + if (pNewtat->create_total_count != pOldStat->create_total_count) { + pOldStat->create_total_count = pNewtat->create_total_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->create_total_count->data, + pNewtat->create_total_count); + } + if (pNewtat->create_success_count != pOldStat->create_success_count) { + pOldStat->create_success_count = pNewtat->create_success_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->create_success_count->data, + pNewtat->create_success_count); + } + if (pNewtat->create_retry_times != pOldStat->create_retry_times) { + pOldStat->create_retry_times = pNewtat->create_retry_times; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->create_retry_times->data, + pNewtat->create_retry_times); + } + if (pNewtat->close_total_count != pOldStat->close_total_count) { + pOldStat->close_total_count = pNewtat->close_total_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->close_total_count->data, + pNewtat->close_total_count); + } + if (pNewtat->close_success_count != pOldStat->close_success_count) { + pOldStat->close_success_count = pNewtat->close_success_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->close_success_count->data, + pNewtat->close_success_count); + } + if (pNewtat->session_miss_count != pOldStat->session_miss_count) { + pOldStat->session_miss_count = pNewtat->session_miss_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->session_miss_count->data, + pNewtat->session_miss_count); + } + if (pNewtat->session_occupied_count != pOldStat->session_occupied_count) { + pOldStat->session_occupied_count = pNewtat->session_occupied_count; + RecDataSetFromInk64(RECD_INT, &pSessionRecords->session_occupied_count->data, + pNewtat->session_occupied_count); + } +} + +static void init_session_stat(SessionRecords *pSessionRecords, const char *prefix) +{ + char name[256]; + RecData data_default; + memset(&data_default, 0, sizeof(RecData)); + + sprintf(name, "%s.create_total_count", prefix); + pSessionRecords->create_total_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.create_success_count", prefix); + pSessionRecords->create_success_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.create_retry_times", prefix); + pSessionRecords->create_retry_times = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.close_total_count", prefix); + pSessionRecords->close_total_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.close_success_count", prefix); + pSessionRecords->close_success_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.miss_count", prefix); + pSessionRecords->session_miss_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); + + sprintf(name, "%s.occupied_count", prefix); + pSessionRecords->session_occupied_count = RecRegisterStat(RECT_PROCESS, + name, RECD_INT, data_default, RECP_NON_PERSISTENT); +} + + +void log_session_stat() +{ + ClusterMachine *pMachine; + ClusterMachine *pMachineEnd; + int machine_id; + MachineSessions *pServerSessions; + MachineSessions *pClientSessions; + SessionStat serverSessionStat; + static SessionStat serverOldStat = {0, 0, 0, 0, 0, 0, 0}; + static SessionStat clientOldStat = {0, 0, 0, 0, 0, 0, 0}; + + serverSessionStat.create_total_count = 0; + serverSessionStat.create_success_count = 0; + serverSessionStat.create_retry_times = 0; + serverSessionStat.close_total_count = 0; + serverSessionStat.close_success_count = 0; + serverSessionStat.session_miss_count = 0; + serverSessionStat.session_occupied_count = 0; + + pMachineEnd = cluster_machines + cluster_machine_count; + for (pMachine=cluster_machines; pMachine<pMachineEnd; pMachine++) { + if ((machine_id=get_session_machine_index(pMachine->ip)) < 0) { + continue; + } + if (pMachine->dead || machine_id == my_machine_id) { + continue; + } + + pServerSessions = all_sessions + machine_id; + serverSessionStat.create_total_count += pServerSessions->session_stat. + create_total_count; + serverSessionStat.close_total_count += pServerSessions->session_stat. + close_total_count; + serverSessionStat.close_success_count += pServerSessions->session_stat. + close_success_count; + serverSessionStat.session_miss_count += pServerSessions->session_stat. + session_miss_count; + serverSessionStat.session_occupied_count += pServerSessions->session_stat. + session_occupied_count; + } + + serverSessionStat.create_success_count = serverSessionStat.create_total_count; + serverSessionStat.create_retry_times = serverSessionStat.create_total_count; + + pClientSessions = all_sessions + my_machine_id; + + set_session_stat(&server_session_records, &serverSessionStat, &serverOldStat); + set_session_stat(&client_session_records, (const SessionStat *) + &pClientSessions->session_stat, &clientOldStat); +} + +#ifdef TRIGGER_STAT_FLAG +void log_trigger_stat() +{ + ClusterMachine *pMachine; + ClusterMachine *pMachineEnd; + MachineSessions *pServerSessions; + MachineSessions *pClientSessions; + MsgTimeUsed serverTimeUsed; + int machine_id; + int server_avg_time_used; + int client_avg_time_used; + + serverTimeUsed.count = 0; + serverTimeUsed.time_used = 0; + + pMachineEnd = cluster_machines + cluster_machine_count; + for (pMachine=cluster_machines; pMachine<pMachineEnd; pMachine++) { + if ((machine_id=get_session_machine_index(pMachine->ip)) < 0) { + continue; + } + if (pMachine->dead || machine_id == my_machine_id) { + continue; + } + + pServerSessions = all_sessions + machine_id; + + serverTimeUsed.count += pServerSessions->trigger_stat.count; + serverTimeUsed.time_used += pServerSessions->trigger_stat.time_used; + if (pServerSessions->trigger_stat.count > 0) { + server_avg_time_used = pServerSessions->trigger_stat.time_used / + pServerSessions->trigger_stat.count; + } + else { + server_avg_time_used = 0; + } + Note("%s:%d trigger msg => %"PRId64", avg time used => %d us", + pMachine->hostname, pMachine->cluster_port, + pServerSessions->trigger_stat.count, + server_avg_time_used / 1000); + + pServerSessions->trigger_stat.count = 0; + pServerSessions->trigger_stat.time_used = 0; + } + + if (serverTimeUsed.count > 0) { + server_avg_time_used = serverTimeUsed.time_used / serverTimeUsed.count; + } + else { + server_avg_time_used = 0; + } + Note("SERVER: trigger msg => %"PRId64", avg time used => %d us", + serverTimeUsed.count, server_avg_time_used / 1000); + + pClientSessions = all_sessions + my_machine_id; + if (pClientSessions->trigger_stat.count > 0) { + client_avg_time_used = pClientSessions->trigger_stat.time_used / + pClientSessions->trigger_stat.count; + } + else { + client_avg_time_used = 0; + } + Note("CLIENT: trigger msg => %"PRId64", avg time used => %d us\n", + pClientSessions->trigger_stat.count, client_avg_time_used / 1000); + + pClientSessions->trigger_stat.count = 0; + pClientSessions->trigger_stat.time_used = 0; +} +#endif + +#ifdef MSG_TIME_STAT_FLAG +void log_msg_time_stat() +{ + ClusterMachine *pMachine; + ClusterMachine *pMachineEnd; + MachineSessions *pServerSessions; + MachineSessions *pClientSessions; + MsgTimeUsed serverTimeUsed; + MsgTimeUsed sendTimeUsed; + int machine_id; + int server_avg_time_used; + int client_avg_time_used; + int send_avg_time_used; + + serverTimeUsed.count = 0; + serverTimeUsed.time_used = 0; + sendTimeUsed.count = 0; + sendTimeUsed.time_used = 0; + + pMachineEnd = cluster_machines + cluster_machine_count; + for (pMachine=cluster_machines; pMachine<pMachineEnd; pMachine++) { + if ((machine_id=get_session_machine_index(pMachine->ip)) < 0) { + continue; + } + if (pMachine->dead || machine_id == my_machine_id) { + continue; + } + + pServerSessions = all_sessions + machine_id; + serverTimeUsed.count += pServerSessions->msg_stat.count; + serverTimeUsed.time_used += pServerSessions->msg_stat.time_used; + if (pServerSessions->msg_stat.count > 0) { + server_avg_time_used = pServerSessions->msg_stat.time_used / + pServerSessions->msg_stat.count; + } + else { + server_avg_time_used = 0; + } + + sendTimeUsed.count += pServerSessions->msg_send.count; + sendTimeUsed.time_used += pServerSessions->msg_send.time_used; + if (pServerSessions->msg_send.count > 0) { + send_avg_time_used = pServerSessions->msg_send.time_used / + pServerSessions->msg_send.count; + } + else { + send_avg_time_used = 0; + } + + Note("%s:%d msg count: %"PRId64", avg time used (recv start to send done): %d us, " + "send msg count: %"PRId64", send avg time: %d us", + pMachine->hostname, pMachine->cluster_port, + pServerSessions->msg_stat.count, server_avg_time_used / 1000, + pServerSessions->msg_send.count, send_avg_time_used / 1000); + + pServerSessions->msg_stat.count = 0; + pServerSessions->msg_stat.time_used = 0; + pServerSessions->msg_send.count = 0; + pServerSessions->msg_send.time_used = 0; + } + + if (serverTimeUsed.count > 0) { + server_avg_time_used = serverTimeUsed.time_used / serverTimeUsed.count; + } + else { + server_avg_time_used = 0; + } + + if (sendTimeUsed.count > 0) { + send_avg_time_used = sendTimeUsed.time_used / sendTimeUsed.count; + } + else { + send_avg_time_used = 0; + } + Note("SERVER: msg count: %"PRId64", avg time used (recv start to send done): %d us, " + "send msg count: %"PRId64", send avg time: %d us", + serverTimeUsed.count, server_avg_time_used / 1000, + sendTimeUsed.count, send_avg_time_used / 1000); + + pClientSessions = all_sessions + my_machine_id; + if (pClientSessions->msg_stat.count > 0) { + client_avg_time_used = pClientSessions->msg_stat.time_used / + pClientSessions->msg_stat.count; + } + else { + client_avg_time_used = 0; + } + if (pClientSessions->msg_send.count > 0) { + send_avg_time_used = pClientSessions->msg_send.time_used / + pClientSessions->msg_send.count; + } + else { + send_avg_time_used = 0; + } + Note("CLIENT: msg count: %"PRId64", avg time used (send start to recv done): %d us, " + "send msg count: %"PRId64", send avg time: %d us\n", + pClientSessions->msg_stat.count, client_avg_time_used / 1000, + pClientSessions->msg_send.count, send_avg_time_used / 1000); + + pClientSessions->msg_stat.count = 0; + pClientSessions->msg_stat.time_used = 0; + pClientSessions->msg_send.count = 0; + pClientSessions->msg_send.time_used = 0; +} +#endif + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/session.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/session.h b/iocore/cluster/session.h new file mode 100644 index 0000000..9dd2559 --- /dev/null +++ b/iocore/cluster/session.h @@ -0,0 +1,97 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef _SESSION_H_ +#define _SESSION_H_ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "types.h" +#include "clusterinterface.h" + +typedef struct { + unsigned int ip; + bool init_done; + bool is_myself; //myself, the local host + SessionEntry *sessions; + ink_mutex *locks; + volatile SequenceType current_seq; + volatile SessionStat session_stat; + +#ifdef TRIGGER_STAT_FLAG + volatile MsgTimeUsed trigger_stat; +#endif + +#ifdef MSG_TIME_STAT_FLAG + volatile MsgTimeUsed msg_stat; + volatile MsgTimeUsed msg_send; +#endif + +} MachineSessions; + +#define SESSION_LOCK(pMachineSessions, session_index) \ + ink_mutex_acquire((pMachineSessions)->locks + session_index % \ + session_lock_count_per_machine) + +#define SESSION_UNLOCK(pMachineSessions, session_index) \ + ink_mutex_release((pMachineSessions)->locks + session_index % \ + session_lock_count_per_machine) + +#ifdef __cplusplus +extern "C" { +#endif + +int session_init(); +int init_machine_sessions(ClusterMachine *machine, const bool bMyself); + +int get_session_for_send(const SessionId *session, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry); +int get_response_session(const MsgHeader *pHeader, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry, + SocketContext *pSocketContext, bool *call_func, void **user_data); + +int notify_connection_closed(SocketContext *pSockContext); + +int push_in_message(const SessionId session, + MachineSessions *pMachineSessions, SessionEntry *pSessionEntry, + const int func_id, IOBufferBlock *blocks, const int data_len); + +void log_session_stat(); + +#ifdef TRIGGER_STAT_FLAG +void log_trigger_stat(); +#endif + +#ifdef MSG_TIME_STAT_FLAG +int get_response_session_internal(const MsgHeader *pHeader, + MachineSessions **ppMachineSessions, SessionEntry **sessionEntry); +void log_msg_time_stat(); +#endif + +#ifdef __cplusplus +} +#endif + +#endif + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/types.h ---------------------------------------------------------------------- diff --git a/iocore/cluster/types.h b/iocore/cluster/types.h new file mode 100644 index 0000000..e11b00c --- /dev/null +++ b/iocore/cluster/types.h @@ -0,0 +1,235 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef _CLUSTER_TYPES_H_ +#define _CLUSTER_TYPES_H_ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "clusterinterface.h" +#include "libts.h" + +#define IP_ADDRESS_SIZE 16 + +//#define USE_MULTI_ALLOCATOR 1 +#define CHECK_MAGIC_NUMBER 1 + +#define PRIORITY_COUNT 3 //priority queue count + +//statistic marco defines +//#define TRIGGER_STAT_FLAG 1 //trigger statistic flag +//#define MSG_TIME_STAT_FLAG 1 //data statistic flag + +#define MSG_HEADER_LENGTH ((int)sizeof(MsgHeader)) +#define MAGIC_NUMBER 0x3308 +#define MAX_MSG_LENGTH (4 * 1024 * 1024) + +#define MAX_MACHINE_COUNT 255 //IMPORTANT: can't be 256!! + +//combine multi msg to call writev +#define WRITEV_ARRAY_SIZE 128 +#define WRITEV_ITEM_ONCE (WRITEV_ARRAY_SIZE / 2) +#define WRITE_MAX_COMBINE_BYTES (64 * 1024) + +#define CONNECT_TYPE_CLIENT 'C' //connect by me, client +#define CONNECT_TYPE_SERVER 'S' //connect by peer, server + +#define DATA_TYPE_BUFFER 'B' //char buffer +#define DATA_TYPE_OBJECT 'O' //IOBufferBlock pointer + +#define ALIGN_BYTES 8 +#define BYTE_ALIGN(x,l) (((x) + ((l) - 1)) & ~((l) - 1)) +#define BYTE_ALIGN8(x) BYTE_ALIGN(x, ALIGN_BYTES) + +#define IS_SESSION_EMPTY(session_id) \ + ((session_id).ids[0] == 0 && (session_id).ids[1] == 0) + +#define IS_SESSION_EQUAL(session_id1, session_id2) \ + ((session_id1).ids[0] == (session_id2).ids[0] && \ + (session_id1).ids[1] == (session_id2).ids[1]) + +typedef struct msg_timeused { + volatile int64_t count; //message count + volatile int64_t time_used; //time used +} MsgTimeUsed; + +typedef struct session_stat { + volatile int64_t create_total_count; //create session total count + volatile int64_t create_success_count; //create session success count + volatile int64_t create_retry_times; //create session retry times + volatile int64_t close_total_count; //close session count + volatile int64_t close_success_count; //close session success count + volatile int64_t session_miss_count; //session miss count + volatile int64_t session_occupied_count; //session occupied count +} SessionStat; + +typedef struct msg_header { +#ifdef CHECK_MAGIC_NUMBER + short magic; //magic number + unsigned short msg_seq; //message sequence no base 1 +#else + uint32_t msg_seq; //message sequence no base 1 +#endif + + int func_id; //function id, must be signed int + int data_len; //message body length + int aligned_data_len; //aligned body length + SessionId session_id; //session id +} MsgHeader; //must aligned by 8 bytes + +typedef struct in_msg_entry { + int func_id; //function id + int data_len; //message body length + Ptr<IOBufferBlock> blocks; + struct in_msg_entry *next; //for income message queue +} InMessage; + +struct worker_thread_context; +struct socket_context; + +typedef struct session_entry { + SessionId session_id; + void *user_data; //user data for callback + struct socket_context *sock_context; + InMessage *messages; //income messages + int16_t response_events; //response events + uint16_t current_msg_seq; //current message sequence no + uint32_t version; //avoid CAS ABA + struct session_entry *next; //session chain, only for server session + +#ifdef TRIGGER_STAT_FLAG + volatile int64_t stat_start_time; //for message time used stat +#endif + +#ifdef MSG_TIME_STAT_FLAG + volatile int64_t client_start_time; //send start time for client + volatile int64_t server_start_time; //recv done time for server + volatile int64_t send_start_time; //send start time for stat send time +#endif + +} SessionEntry; + +//out message to send +typedef struct out_msg_entry { + MsgHeader header; + char mini_buff[MINI_MESSAGE_SIZE]; //for mini message + Ptr<IOBufferBlock> blocks; //block data passed by caller + + struct out_msg_entry *next; //for send queue + int bytes_sent; //important: including msg header + int data_type; //DATA_TYPE_BUFFER or DATA_TYPE_OBJECT + int64_t in_queue_time; //the time when push to send queue +} OutMessage; + +//out message queue +typedef struct message_queue { + OutMessage *head; + OutMessage *tail; + ink_mutex lock; +} MessageQueue; + +//for recv messages +typedef struct reader_manager { + Ptr<IOBufferData> buffer; //recv buffer + Ptr<IOBufferBlock> blocks; //recv blocks + char *msg_header; //current message start + char *current; //current pointer + char *buff_end; //buffer end + int recv_body_bytes; //recveived body bytes +} ReaderManager; + +typedef struct socket_context { + int sock; //socket fd + char padding[ALIGN_BYTES]; //padding buffer + struct reader_manager reader; //recv buffer + struct ClusterMachine *machine; //peer machine, point to global machine + struct worker_thread_context *thread_context; //the thread belong to + MessageQueue send_queues[PRIORITY_COUNT]; //queue for send + + int queue_index; //current deal queue index base 0 + int connect_type; //CONNECT_TYPE_CLIENT or CONNECT_TYPE_SERVER + time_t connected_time; //connection established timestamp + uint32_t version; //avoid CAS ABA + + int64_t next_write_time; //next time to send message + + int ping_fail_count; //cluster ping fail counter + int64_t next_ping_time; //next time to send ping message + int64_t ping_start_time; //the start time of ping + +#ifdef USE_MULTI_ALLOCATOR + Allocator *out_msg_allocator; //for send + Allocator *in_msg_allocator; //for notify dealer +#endif + struct socket_context *next; //for freelist +} SocketContext; + +typedef struct socket_stats { + int64_t send_msg_count; //send msg count + int64_t drop_msg_count; //droped msg count when close socket + int64_t send_bytes; + int64_t drop_bytes; + int64_t call_writev_count; + int64_t send_retry_count; + int64_t send_delayed_time; + + volatile int64_t push_msg_count; //push to send queue msg count + volatile int64_t push_msg_bytes; //push to send queue msg bytes + + volatile int64_t fail_msg_count; //push to send queue fail msg count + volatile int64_t fail_msg_bytes; //push to send queue fail msg bytes + + int64_t recv_msg_count; //recv msg count + int64_t enqueue_in_msg_count; //push into in msg queue + int64_t dequeue_in_msg_count; //pop from in msg queue + int64_t recv_bytes; + int64_t enqueue_in_msg_bytes; //push into in msg queue + int64_t dequeue_in_msg_bytes; //pop from in msg queue + + int64_t call_read_count; + int64_t epoll_wait_count; + int64_t epoll_wait_time_used; + int64_t loop_usleep_count; + int64_t loop_usleep_time; + + int64_t ping_total_count; + int64_t ping_success_count; + int64_t ping_time_used; +} SocketStats; + +class EventPoll; + +typedef struct worker_thread_context +{ + EventPoll *ev_poll; + int alloc_size; //max count of epoll events + int thread_index; //my thread index + int active_sock_count; + SocketStats stats; + ink_mutex lock; + SocketContext **active_sockets; +} WorkerThreadContext; + +#endif + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/eventsystem/I_Event.h ---------------------------------------------------------------------- diff --git a/iocore/eventsystem/I_Event.h b/iocore/eventsystem/I_Event.h index 7a37ea0..2659131 100644 --- a/iocore/eventsystem/I_Event.h +++ b/iocore/eventsystem/I_Event.h @@ -85,6 +85,7 @@ #define BLOCK_CACHE_EVENT_EVENTS_START 4000 #define UTILS_EVENT_EVENTS_START 5000 #define CONGESTION_EVENT_EVENTS_START 5100 +#define CLUSTER_MSG_START 6000 #define INK_API_EVENT_EVENTS_START 60000 #define SRV_EVENT_EVENTS_START 62000 #define REMAP_EVENT_EVENTS_START 63000 http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/eventsystem/P_IOBuffer.h ---------------------------------------------------------------------- diff --git a/iocore/eventsystem/P_IOBuffer.h b/iocore/eventsystem/P_IOBuffer.h index 0842aff..261aa1f 100644 --- a/iocore/eventsystem/P_IOBuffer.h +++ b/iocore/eventsystem/P_IOBuffer.h @@ -203,7 +203,7 @@ new_IOBufferData_internal( void *b, int64_t size, int64_t asize_index) { (void) size; - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread()); + IOBufferData *d = ioDataAllocator.alloc(); d->_size_index = asize_index; ink_assert(BUFFER_SIZE_INDEX_IS_CONSTANT(asize_index) || size <= d->block_size()); @@ -263,7 +263,7 @@ new_IOBufferData_internal( #endif int64_t size_index, AllocType type) { - IOBufferData *d = THREAD_ALLOC(ioDataAllocator, this_ethread()); + IOBufferData *d = ioDataAllocator.alloc(); #ifdef TRACK_BUFFER_USER d->_location = loc; #endif @@ -336,7 +336,7 @@ TS_INLINE void IOBufferData::free() { dealloc(); - THREAD_FREE(this, ioDataAllocator, this_ethread()); + ioDataAllocator.free(this); } ////////////////////////////////////////////////////////////////// @@ -352,7 +352,7 @@ new_IOBufferBlock_internal( #endif ) { - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread()); + IOBufferBlock *b = ioBlockAllocator.alloc(); #ifdef TRACK_BUFFER_USER b->_location = location; #endif @@ -366,7 +366,7 @@ new_IOBufferBlock_internal( #endif IOBufferData * d, int64_t len, int64_t offset) { - IOBufferBlock *b = THREAD_ALLOC(ioBlockAllocator, this_ethread()); + IOBufferBlock *b = ioBlockAllocator.alloc(); #ifdef TRACK_BUFFER_USER b->_location = location; #endif @@ -468,7 +468,7 @@ TS_INLINE void IOBufferBlock::free() { dealloc(); - THREAD_FREE(this, ioBlockAllocator, this_ethread()); + ioBlockAllocator.free(this); } TS_INLINE void @@ -777,7 +777,7 @@ TS_INLINE MIOBuffer * new_MIOBuffer_internal( #endif int64_t size_index) { - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread()); + MIOBuffer *b = ioAllocator.alloc(); #ifdef TRACK_BUFFER_USER b->_location = location; #endif @@ -790,7 +790,7 @@ free_MIOBuffer(MIOBuffer * mio) { mio->_writer = NULL; mio->dealloc_all_readers(); - THREAD_FREE(mio, ioAllocator, this_ethread()); + ioAllocator.free(mio); } TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal( @@ -799,7 +799,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal( #endif int64_t size_index) { - MIOBuffer *b = THREAD_ALLOC(ioAllocator, this_ethread()); + MIOBuffer *b = ioAllocator.alloc(); b->size_index = size_index; #ifdef TRACK_BUFFER_USER b->_location = location; @@ -810,7 +810,7 @@ TS_INLINE MIOBuffer * new_empty_MIOBuffer_internal( TS_INLINE void free_empty_MIOBuffer(MIOBuffer * mio) { - THREAD_FREE(mio, ioAllocator, this_ethread()); + ioAllocator.free(mio); } TS_INLINE IOBufferReader * http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/mgmt/RecordsConfig.cc ---------------------------------------------------------------------- diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index 4a73f19..7677471 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -814,6 +814,24 @@ RecordElement RecordsConfig[] = { , {RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_bps", RECD_INT, "804857600", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_bps", RECD_INT, "4194304000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_send_wait_time", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_send_wait_time", RECD_INT, "5000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.min_loop_interval", RECD_INT, "0", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.flow_ctrl.max_loop_interval", RECD_INT, "1000", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.max_sessions_per_machine", RECD_INT, "1000000", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1000-4000000]", RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.session_locks_per_machine", RECD_INT, "10949", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-100000]", RECA_NULL} + , + {RECT_CONFIG, "proxy.config.cluster.read_buffer_size", RECD_INT, "2097152", RECU_RESTART_TS, RR_NULL, RECC_INT, "[65536-2097152]", RECA_NULL} + , {RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL} , {RECT_CONFIG, "proxy.config.cluster.ethernet_interface", RECD_STRING, TS_BUILD_DEFAULT_LOOPBACK_IFACE, RECU_RESTART_TS, RR_REQUIRED, RECC_STR, "^[^[:space:]]*$", RECA_NULL}
