Repository: trafficserver Updated Branches: refs/heads/master 51dffeb61 -> 428c3d581
TS-2759: Rename SpdySM to SpdyClientSession Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/92d70eb7 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/92d70eb7 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/92d70eb7 Branch: refs/heads/master Commit: 92d70eb7b07843da81a4e184aebd0e0b978f42eb Parents: 51dffeb Author: James Peach <[email protected]> Authored: Sat May 17 11:46:49 2014 -0700 Committer: James Peach <[email protected]> Committed: Tue May 20 09:43:42 2014 -0700 ---------------------------------------------------------------------- proxy/spdy/Makefile.am | 10 +- proxy/spdy/SpdyCallbacks.cc | 24 +- proxy/spdy/SpdyCallbacks.h | 4 +- proxy/spdy/SpdyClientSession.cc | 433 +++++++++++++++++++++++++++++++++++ proxy/spdy/SpdyClientSession.h | 144 ++++++++++++ proxy/spdy/SpdySM.cc | 433 ----------------------------------- proxy/spdy/SpdySM.h | 145 ------------ proxy/spdy/SpdySessionAccept.cc | 2 +- 8 files changed, 597 insertions(+), 598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/Makefile.am ---------------------------------------------------------------------- diff --git a/proxy/spdy/Makefile.am b/proxy/spdy/Makefile.am index a532d5a..a3b5a36 100644 --- a/proxy/spdy/Makefile.am +++ b/proxy/spdy/Makefile.am @@ -37,11 +37,11 @@ libspdy_a_SOURCES = \ SpdySessionAccept.cc if BUILD_SPDY - libspdy_a_SOURCES += \ - SpdyCallbacks.h \ - SpdyCommon.h \ - SpdySM.h \ +libspdy_a_SOURCES += \ SpdyCallbacks.cc \ + SpdyCallbacks.h \ + SpdyClientSession.cc \ + SpdyClientSession.h \ SpdyCommon.cc \ - SpdySM.cc + SpdyCommon.h endif http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdyCallbacks.cc ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdyCallbacks.cc b/proxy/spdy/SpdyCallbacks.cc index 647d3b5..c31a239 100644 --- a/proxy/spdy/SpdyCallbacks.cc +++ b/proxy/spdy/SpdyCallbacks.cc @@ -22,7 +22,7 @@ */ #include "SpdyCallbacks.h" -#include "SpdySM.h" +#include "SpdyClientSession.h" #include <arpa/inet.h> void @@ -50,7 +50,7 @@ spdy_callbacks_init(spdylay_session_callbacks *callbacks) } void -spdy_prepare_status_response(SpdySM *sm, int stream_id, const char *status) +spdy_prepare_status_response(SpdyClientSession *sm, int stream_id, const char *status) { SpdyRequest *req = sm->req_map[stream_id]; string date_str = http_date(time(0)); @@ -85,7 +85,7 @@ spdy_show_data_frame(const char *head_str, spdylay_session * /*session*/, uint8_ if (!is_debug_tag_set("spdy")) return; - SpdySM *sm = (SpdySM *)user_data; + SpdyClientSession *sm = (SpdyClientSession *)user_data; Debug("spdy", "%s DATA frame (sm_id:%" PRIu64 ", stream_id:%d, flag:%d, length:%d)", head_str, sm->sm_id, stream_id, flags, length); @@ -98,7 +98,7 @@ spdy_show_ctl_frame(const char *head_str, spdylay_session * /*session*/, spdylay if (!is_debug_tag_set("spdy")) return; - SpdySM *sm = (SpdySM *)user_data; + SpdyClientSession *sm = (SpdyClientSession *)user_data; switch (type) { case SPDYLAY_SYN_STREAM: { spdylay_syn_stream *f = (spdylay_syn_stream *)frame; @@ -170,7 +170,7 @@ spdy_fetcher_launch(SpdyRequest *req, TSFetchMethod method) string url; int fetch_flags; const sockaddr *client_addr; - SpdySM *sm = req->spdy_sm; + SpdyClientSession *sm = req->spdy_sm; url = req->scheme + "://" + req->host + req->path; client_addr = TSNetVConnRemoteAddrGet(reinterpret_cast<TSVConn>(sm->vc)); @@ -214,7 +214,7 @@ ssize_t spdy_send_callback(spdylay_session * /*session*/, const uint8_t *data, size_t length, int /*flags*/, void *user_data) { - SpdySM *sm = (SpdySM*)user_data; + SpdyClientSession *sm = (SpdyClientSession*)user_data; sm->total_size += length; TSIOBufferWrite(sm->resp_buffer, data, length); @@ -232,7 +232,7 @@ spdy_recv_callback(spdylay_session * /*session*/, uint8_t *buf, size_t length, TSIOBufferBlock blk, next_blk; int64_t already, blk_len, need, wavail; - SpdySM *sm = (SpdySM*)user_data; + SpdyClientSession *sm = (SpdyClientSession*)user_data; already = 0; blk = TSIOBufferReaderStart(sm->req_reader); @@ -270,7 +270,7 @@ spdy_recv_callback(spdylay_session * /*session*/, uint8_t *buf, size_t length, } static void -spdy_process_syn_stream_frame(SpdySM *sm, SpdyRequest *req) +spdy_process_syn_stream_frame(SpdyClientSession *sm, SpdyRequest *req) { // validate request headers for(size_t i = 0; i < req->headers.size(); ++i) { @@ -322,7 +322,7 @@ spdy_on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, { int stream_id; SpdyRequest *req; - SpdySM *sm = (SpdySM*)user_data; + SpdyClientSession *sm = (SpdyClientSession*)user_data; spdy_show_ctl_frame("++++RECV", session, type, frame, user_data); @@ -369,7 +369,7 @@ spdy_on_data_chunk_recv_callback(spdylay_session * /*session*/, uint8_t /*flags* int32_t stream_id, const uint8_t *data, size_t len, void *user_data) { - SpdySM *sm = (SpdySM *)user_data; + SpdyClientSession *sm = (SpdyClientSession *)user_data; SpdyRequest *req = sm->req_map[stream_id]; // @@ -388,7 +388,7 @@ void spdy_on_data_recv_callback(spdylay_session *session, uint8_t flags, int32_t stream_id, int32_t length, void *user_data) { - SpdySM *sm = (SpdySM *)user_data; + SpdyClientSession *sm = (SpdyClientSession *)user_data; SpdyRequest *req = sm->req_map[stream_id]; spdy_show_data_frame("++++RECV", session, flags, stream_id, length, user_data); @@ -459,7 +459,7 @@ void spdy_on_data_send_callback(spdylay_session *session, uint8_t flags, int32_t stream_id, int32_t length, void *user_data) { - SpdySM *sm = (SpdySM *)user_data; + SpdyClientSession *sm = (SpdyClientSession *)user_data; spdy_show_data_frame("----SEND", session, flags, stream_id, length, user_data); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdyCallbacks.h ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdyCallbacks.h b/proxy/spdy/SpdyCallbacks.h index 017eb41..ebe3c71 100644 --- a/proxy/spdy/SpdyCallbacks.h +++ b/proxy/spdy/SpdyCallbacks.h @@ -25,10 +25,10 @@ #define __P_SPDY_CALLBACKS_H__ #include <spdylay/spdylay.h> -class SpdySM; +class SpdyClientSession; void spdy_callbacks_init(spdylay_session_callbacks *callbacks); -void spdy_prepare_status_response(SpdySM *sm, int stream_id, const char *status); +void spdy_prepare_status_response(SpdyClientSession *sm, int stream_id, const char *status); /** * @functypedef http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdyClientSession.cc ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdyClientSession.cc b/proxy/spdy/SpdyClientSession.cc new file mode 100644 index 0000000..73cb4e0 --- /dev/null +++ b/proxy/spdy/SpdyClientSession.cc @@ -0,0 +1,433 @@ +/** @file + + SpdyClientSession.cc + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "SpdyClientSession.h" +#include "I_Net.h" + +static ClassAllocator<SpdyClientSession> spdyClientSessionAllocator("spdyClientSessionAllocator"); +ClassAllocator<SpdyRequest> spdyRequestAllocator("spdyRequestAllocator"); + +static int spdy_main_handler(TSCont contp, TSEvent event, void *edata); +static int spdy_start_handler(TSCont contp, TSEvent event, void *edata); +static int spdy_default_handler(TSCont contp, TSEvent event, void *edata); +static int spdy_process_read(TSEvent event, SpdyClientSession *sm); +static int spdy_process_write(TSEvent event, SpdyClientSession *sm); +static int spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata); +static int spdy_process_fetch_header(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm); +static int spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm); +static uint64_t g_sm_id; +static uint64_t g_sm_cnt; + +void +SpdyRequest::clear() +{ + if (fetch_sm) + TSFetchDestroy(fetch_sm); + + vector<pair<string, string> >().swap(headers); + + std::string().swap(url); + std::string().swap(host); + std::string().swap(path); + std::string().swap(scheme); + std::string().swap(method); + std::string().swap(version); + + Debug("spdy", "****Delete Request[%" PRIu64 ":%d]", spdy_sm->sm_id, stream_id); +} + +void +SpdyClientSession::init(NetVConnection * netvc) +{ + int version, r; + + atomic_inc(g_sm_cnt); + + this->vc = netvc; + this->req_map.clear(); + + // XXX this has to die ... TS-2793 + UnixNetVConnection * unixvc = reinterpret_cast<UnixNetVConnection *>(netvc); + + if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_3_1) + version = SPDYLAY_PROTO_SPDY3_1; + else if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_3) + version = SPDYLAY_PROTO_SPDY3; + else if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_2) + version = SPDYLAY_PROTO_SPDY2; + else + version = SPDYLAY_PROTO_SPDY3; + + r = spdylay_session_server_new(&session, version, + &SPDY_CFG.spdy.callbacks, this); + ink_release_assert(r == 0); + sm_id = atomic_inc(g_sm_id); + total_size = 0; + start_time = TShrtime(); + + ink_assert(this->contp == NULL); + this->contp = TSContCreate(spdy_main_handler, TSMutexCreate()); + TSContDataSet(this->contp, this); + + this->vc->set_inactivity_timeout(HRTIME_SECONDS(SPDY_CFG.accept_no_activity_timeout)); + this->current_handler = &spdy_start_handler; +} + +void +SpdyClientSession::clear() +{ + uint64_t nr_pending; + int last_event = event; + // + // SpdyRequest depends on SpdyClientSession, + // we should delete it firstly to avoid race. + // + map<int, SpdyRequest*>::iterator iter = req_map.begin(); + map<int, SpdyRequest*>::iterator endIter = req_map.end(); + for(; iter != endIter; ++iter) { + SpdyRequest *req = iter->second; + if (req) { + req->clear(); + spdyRequestAllocator.free(req); + } else { + Error("req null in SpdSM::clear"); + } + } + req_map.clear(); + + if (vc) { + TSVConnClose(reinterpret_cast<TSVConn>(vc)); + vc = NULL; + } + + if (contp) { + TSContDestroy(contp); + contp = NULL; + } + + if (req_reader) { + TSIOBufferReaderFree(req_reader); + req_reader = NULL; + } + + if (req_buffer) { + TSIOBufferDestroy(req_buffer); + req_buffer = NULL; + } + + if (resp_reader) { + TSIOBufferReaderFree(resp_reader); + resp_reader = NULL; + } + + if (resp_buffer) { + TSIOBufferDestroy(resp_buffer); + resp_buffer = NULL; + } + + if (session) { + spdylay_session_del(session); + session = NULL; + } + + nr_pending = atomic_dec(g_sm_cnt); + Debug("spdy-free", "****Delete SpdyClientSession[%" PRIu64 "], last event:%d, nr_pending:%" PRIu64, + sm_id, last_event, --nr_pending); +} + +void +spdy_sm_create(NetVConnection * netvc, MIOBuffer * iobuf, IOBufferReader * reader) +{ + SpdyClientSession *sm; + + sm = spdyClientSessionAllocator.alloc(); + sm->init(netvc); + + sm->req_buffer = iobuf ? reinterpret_cast<TSIOBuffer>(iobuf) : TSIOBufferCreate(); + sm->req_reader = reader ? reinterpret_cast<TSIOBufferReader>(reader) : TSIOBufferReaderAlloc(sm->req_buffer); + + sm->resp_buffer = TSIOBufferCreate(); + sm->resp_reader = TSIOBufferReaderAlloc(sm->resp_buffer); + + TSContSchedule(sm->contp, 0, TS_THREAD_POOL_DEFAULT); // schedule now +} + +static int +spdy_main_handler(TSCont contp, TSEvent event, void *edata) +{ + SpdyClientSession *sm; + SpdyClientSessionHandler spdy_current_handler; + + sm = (SpdyClientSession*)TSContDataGet(contp); + spdy_current_handler = sm->current_handler; + + return (*spdy_current_handler) (contp, event, edata); +} + +static int +spdy_start_handler(TSCont contp, TSEvent /*event*/, void * /*data*/) +{ + int r; + spdylay_settings_entry entry; + + SpdyClientSession *sm = (SpdyClientSession*)TSContDataGet(contp); + + if (TSIOBufferReaderAvail(sm->req_reader) > 0) { + spdy_process_read(TS_EVENT_VCONN_WRITE_READY, sm); + } + + sm->read_vio = (TSVIO)sm->vc->do_io_read(reinterpret_cast<Continuation *>(contp), INT64_MAX, reinterpret_cast<MIOBuffer *>(sm->req_buffer)); + sm->write_vio = (TSVIO)sm->vc->do_io_write(reinterpret_cast<Continuation *>(contp), INT64_MAX, reinterpret_cast<IOBufferReader *>(sm->resp_reader)); + + sm->current_handler = &spdy_default_handler; + + /* send initial settings frame */ + entry.settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; + entry.value = SPDY_CFG.spdy.max_concurrent_streams; + entry.flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; + + r = spdylay_submit_settings(sm->session, SPDYLAY_FLAG_SETTINGS_NONE, &entry, 1); + TSAssert(r == 0); + + TSVIOReenable(sm->write_vio); + return 0; +} + +static int +spdy_default_handler(TSCont contp, TSEvent event, void *edata) +{ + int ret = 0; + bool from_fetch = false; + SpdyClientSession *sm = (SpdyClientSession*)TSContDataGet(contp); + sm->event = event; + + if (edata == sm->read_vio) { + Debug("spdy", "++++[READ EVENT]"); + if (event != TS_EVENT_VCONN_READ_READY && + event != TS_EVENT_VCONN_READ_COMPLETE) { + ret = -1; + goto out; + } + ret = spdy_process_read(event, sm); + } else if (edata == sm->write_vio) { + Debug("spdy", "----[WRITE EVENT]"); + if (event != TS_EVENT_VCONN_WRITE_READY && + event != TS_EVENT_VCONN_WRITE_COMPLETE) { + ret = -1; + goto out; + } + ret = spdy_process_write(event, sm); + } else { + from_fetch = true; + ret = spdy_process_fetch(event, sm, edata); + } + + Debug("spdy-event", "++++SpdyClientSession[%" PRIu64 "], EVENT:%d, ret:%d, nr_pending:%" PRIu64, + sm->sm_id, event, ret, g_sm_cnt); +out: + if (ret) { + sm->clear(); + spdyClientSessionAllocator.free(sm); + } else if (!from_fetch) { + sm->vc->set_inactivity_timeout(HRTIME_SECONDS(SPDY_CFG.no_activity_timeout_in)); + } + + return 0; +} + +static int +spdy_process_read(TSEvent /* event ATS_UNUSED */, SpdyClientSession *sm) +{ + return spdylay_session_recv(sm->session); +} + +static int +spdy_process_write(TSEvent /* event ATS_UNUSED */, SpdyClientSession *sm) +{ + int ret; + + ret = spdylay_session_send(sm->session); + + if (TSIOBufferReaderAvail(sm->resp_reader) > 0) + TSVIOReenable(sm->write_vio); + else { + Debug("spdy", "----TOTAL SEND (sm_id:%" PRIu64 ", total_size:%" PRIu64 ", total_send:%" PRId64 ")", + sm->sm_id, sm->total_size, TSVIONDoneGet(sm->write_vio)); + + // + // We should reenable read_vio when no data to be written, + // otherwise it could lead to hang issue when client POST + // data is waiting to be read. + // + TSVIOReenable(sm->read_vio); + } + + return ret; +} + +static int +spdy_process_fetch(TSEvent event, SpdyClientSession *sm, void *edata) +{ + int ret = -1; + TSFetchSM fetch_sm = (TSFetchSM)edata; + SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); + + switch ((int)event) { + + case TS_FETCH_EVENT_EXT_HEAD_DONE: + Debug("spdy", "----[FETCH HEADER DONE]"); + ret = spdy_process_fetch_header(event, sm, fetch_sm); + break; + + case TS_FETCH_EVENT_EXT_BODY_READY: + Debug("spdy", "----[FETCH BODY READY]"); + ret = spdy_process_fetch_body(event, sm, fetch_sm); + break; + + case TS_FETCH_EVENT_EXT_BODY_DONE: + Debug("spdy", "----[FETCH BODY DONE]"); + req->fetch_body_completed = true; + ret = spdy_process_fetch_body(event, sm, fetch_sm); + break; + + default: + Debug("spdy", "----[FETCH ERROR]"); + if (req->fetch_body_completed) + ret = 0; // Ignore fetch errors after FETCH BODY DONE + else + req->fetch_sm = NULL; + break; + } + + if (ret) { + spdy_prepare_status_response(sm, req->stream_id, STATUS_500); + sm->req_map.erase(req->stream_id); + req->clear(); + spdyRequestAllocator.free(req); + } + + return 0; +} + +static int +spdy_process_fetch_header(TSEvent /*event*/, SpdyClientSession *sm, TSFetchSM fetch_sm) +{ + int ret; + SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); + SpdyNV spdy_nv(fetch_sm); + + Debug("spdy", "----spdylay_submit_syn_reply"); + ret = spdylay_submit_syn_reply(sm->session, + SPDYLAY_CTRL_FLAG_NONE, req->stream_id, + spdy_nv.nv); + + TSVIOReenable(sm->write_vio); + return ret; +} + +static ssize_t +spdy_read_fetch_body_callback(spdylay_session * /*session*/, int32_t stream_id, + uint8_t *buf, size_t length, int *eof, + spdylay_data_source *source, void *user_data) +{ + + static int g_call_cnt; + int64_t already; + + SpdyClientSession *sm = (SpdyClientSession *)user_data; + SpdyRequest *req = (SpdyRequest *)source->ptr; + + // + // req has been deleted, ignore this data. + // + if (req != sm->req_map[stream_id]) { + Debug("spdy", " stream_id:%d, call:%d, req has been deleted, return 0", + stream_id, g_call_cnt); + *eof = 1; + return 0; + } + + already = TSFetchReadData(req->fetch_sm, buf, length); + + Debug("spdy", " stream_id:%d, call:%d, length:%ld, already:%" PRId64, + stream_id, g_call_cnt, length, already); + if (SPDY_CFG.spdy.verbose) + MD5_Update(&req->recv_md5, buf, already); + + TSVIOReenable(sm->write_vio); + g_call_cnt++; + + req->fetch_data_len += already; + if (already < (int64_t)length) { + if (req->event == TS_FETCH_EVENT_EXT_BODY_DONE) { + TSHRTime end_time = TShrtime(); + Debug("spdy", "----Request[%" PRIu64 ":%d] %s %lld %d", sm->sm_id, req->stream_id, + req->url.c_str(), (end_time - req->start_time)/TS_HRTIME_MSECOND, + req->fetch_data_len); + unsigned char digest[MD5_DIGEST_LENGTH]; + if (SPDY_CFG.spdy.verbose ) { + MD5_Final(digest, &req->recv_md5); + Debug("spdy", "----recv md5sum: "); + for (int i = 0; i < MD5_DIGEST_LENGTH; i++) { + Debug("spdy", "%02x", digest[i]); + } + } + *eof = 1; + sm->req_map.erase(stream_id); + req->clear(); + spdyRequestAllocator.free(req); + } else if (already == 0) { + req->need_resume_data = true; + return SPDYLAY_ERR_DEFERRED; + } + } + + return already; +} + +static int +spdy_process_fetch_body(TSEvent event, SpdyClientSession *sm, TSFetchSM fetch_sm) +{ + int ret = 0; + spdylay_data_provider data_prd; + SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); + req->event = event; + + data_prd.source.ptr = (void *)req; + data_prd.read_callback = spdy_read_fetch_body_callback; + + if (!req->has_submitted_data) { + req->has_submitted_data = true; + Debug("spdy", "----spdylay_submit_data"); + ret = spdylay_submit_data(sm->session, req->stream_id, + SPDYLAY_DATA_FLAG_FIN, &data_prd); + } else if (req->need_resume_data) { + Debug("spdy", "----spdylay_session_resume_data"); + ret = spdylay_session_resume_data(sm->session, req->stream_id); + if (ret == SPDYLAY_ERR_INVALID_ARGUMENT) + ret = 0; + } + + TSVIOReenable(sm->write_vio); + return ret; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdyClientSession.h ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdyClientSession.h b/proxy/spdy/SpdyClientSession.h new file mode 100644 index 0000000..00a86b9 --- /dev/null +++ b/proxy/spdy/SpdyClientSession.h @@ -0,0 +1,144 @@ +/** @file + + SpdyClientSession.h + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#ifndef __P_SPDY_SM_H__ +#define __P_SPDY_SM_H__ + +#include "SpdyCommon.h" +#include "SpdyCallbacks.h" +#include <openssl/md5.h> + +class SpdyClientSession; +typedef int (*SpdyClientSessionHandler) (TSCont contp, TSEvent event, void *data); + +class SpdyRequest +{ +public: + SpdyRequest(): + spdy_sm(NULL), stream_id(-1), fetch_sm(NULL), + has_submitted_data(false), need_resume_data(false), + fetch_data_len(0), delta_window_size(0), + fetch_body_completed(false) + { + } + + SpdyRequest(SpdyClientSession *sm, int id): + spdy_sm(NULL), stream_id(-1), fetch_sm(NULL), + has_submitted_data(false), need_resume_data(false), + fetch_data_len(0), delta_window_size(0), + fetch_body_completed(false) + { + init(sm, id); + } + + ~SpdyRequest() + { + clear(); + } + + void init(SpdyClientSession *sm, int id) + { + spdy_sm = sm; + stream_id = id; + headers.clear(); + + MD5_Init(&recv_md5); + start_time = TShrtime(); + } + + void clear(); + + void append_nv(char **nv) + { + for(int i = 0; nv[i]; i += 2) { + headers.push_back(make_pair(nv[i], nv[i+1])); + } + } + +public: + int event; + SpdyClientSession *spdy_sm; + int stream_id; + TSHRTime start_time; + TSFetchSM fetch_sm; + bool has_submitted_data; + bool need_resume_data; + int fetch_data_len; + int delta_window_size; + bool fetch_body_completed; + vector<pair<string, string> > headers; + + string url; + string host; + string path; + string scheme; + string method; + string version; + + MD5_CTX recv_md5; +}; + +class SpdyClientSession +{ + +public: + + SpdyClientSession() { + } + + ~SpdyClientSession() { + clear(); + } + + void init(NetVConnection * netvc); + void clear(); + + int64_t sm_id; + uint64_t total_size; + TSHRTime start_time; + + NetVConnection * vc; + TSCont contp; + + TSIOBuffer req_buffer; + TSIOBufferReader req_reader; + + TSIOBuffer resp_buffer; + TSIOBufferReader resp_reader; + + TSVIO read_vio; + TSVIO write_vio; + + SpdyClientSessionHandler current_handler; + + int event; + spdylay_session *session; + + map<int32_t, SpdyRequest*> req_map; +}; + +void spdy_sm_create(NetVConnection * netvc, MIOBuffer * iobuf, IOBufferReader * reader); + +extern ClassAllocator<SpdyRequest> spdyRequestAllocator; + +#endif http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdySM.cc ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdySM.cc b/proxy/spdy/SpdySM.cc deleted file mode 100644 index df66f39..0000000 --- a/proxy/spdy/SpdySM.cc +++ /dev/null @@ -1,433 +0,0 @@ -/** @file - - SpdySM.cc - - @section license License - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -#include "SpdySM.h" -#include "I_Net.h" - -ClassAllocator<SpdySM> spdySMAllocator("SpdySMAllocator"); -ClassAllocator<SpdyRequest> spdyRequestAllocator("SpdyRequestAllocator"); - -static int spdy_main_handler(TSCont contp, TSEvent event, void *edata); -static int spdy_start_handler(TSCont contp, TSEvent event, void *edata); -static int spdy_default_handler(TSCont contp, TSEvent event, void *edata); -static int spdy_process_read(TSEvent event, SpdySM *sm); -static int spdy_process_write(TSEvent event, SpdySM *sm); -static int spdy_process_fetch(TSEvent event, SpdySM *sm, void *edata); -static int spdy_process_fetch_header(TSEvent event, SpdySM *sm, TSFetchSM fetch_sm); -static int spdy_process_fetch_body(TSEvent event, SpdySM *sm, TSFetchSM fetch_sm); -static uint64_t g_sm_id; -static uint64_t g_sm_cnt; - -void -SpdyRequest::clear() -{ - if (fetch_sm) - TSFetchDestroy(fetch_sm); - - vector<pair<string, string> >().swap(headers); - - std::string().swap(url); - std::string().swap(host); - std::string().swap(path); - std::string().swap(scheme); - std::string().swap(method); - std::string().swap(version); - - Debug("spdy", "****Delete Request[%" PRIu64 ":%d]", spdy_sm->sm_id, stream_id); -} - -void -SpdySM::init(NetVConnection * netvc) -{ - int version, r; - - atomic_inc(g_sm_cnt); - - this->vc = netvc; - this->req_map.clear(); - - // XXX this has to die ... TS-2793 - UnixNetVConnection * unixvc = reinterpret_cast<UnixNetVConnection *>(netvc); - - if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_3_1) - version = SPDYLAY_PROTO_SPDY3_1; - else if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_3) - version = SPDYLAY_PROTO_SPDY3; - else if (unixvc->selected_next_protocol == TS_NPN_PROTOCOL_SPDY_2) - version = SPDYLAY_PROTO_SPDY2; - else - version = SPDYLAY_PROTO_SPDY3; - - r = spdylay_session_server_new(&session, version, - &SPDY_CFG.spdy.callbacks, this); - ink_release_assert(r == 0); - sm_id = atomic_inc(g_sm_id); - total_size = 0; - start_time = TShrtime(); - - ink_assert(this->contp == NULL); - this->contp = TSContCreate(spdy_main_handler, TSMutexCreate()); - TSContDataSet(this->contp, this); - - this->vc->set_inactivity_timeout(HRTIME_SECONDS(SPDY_CFG.accept_no_activity_timeout)); - this->current_handler = &spdy_start_handler; -} - -void -SpdySM::clear() -{ - uint64_t nr_pending; - int last_event = event; - // - // SpdyRequest depends on SpdySM, - // we should delete it firstly to avoid race. - // - map<int, SpdyRequest*>::iterator iter = req_map.begin(); - map<int, SpdyRequest*>::iterator endIter = req_map.end(); - for(; iter != endIter; ++iter) { - SpdyRequest *req = iter->second; - if (req) { - req->clear(); - spdyRequestAllocator.free(req); - } else { - Error("req null in SpdSM::clear"); - } - } - req_map.clear(); - - if (vc) { - TSVConnClose(reinterpret_cast<TSVConn>(vc)); - vc = NULL; - } - - if (contp) { - TSContDestroy(contp); - contp = NULL; - } - - if (req_reader) { - TSIOBufferReaderFree(req_reader); - req_reader = NULL; - } - - if (req_buffer) { - TSIOBufferDestroy(req_buffer); - req_buffer = NULL; - } - - if (resp_reader) { - TSIOBufferReaderFree(resp_reader); - resp_reader = NULL; - } - - if (resp_buffer) { - TSIOBufferDestroy(resp_buffer); - resp_buffer = NULL; - } - - if (session) { - spdylay_session_del(session); - session = NULL; - } - - nr_pending = atomic_dec(g_sm_cnt); - Debug("spdy-free", "****Delete SpdySM[%" PRIu64 "], last event:%d, nr_pending:%" PRIu64, - sm_id, last_event, --nr_pending); -} - -void -spdy_sm_create(NetVConnection * netvc, MIOBuffer * iobuf, IOBufferReader * reader) -{ - SpdySM *sm; - - sm = spdySMAllocator.alloc(); - sm->init(netvc); - - sm->req_buffer = iobuf ? reinterpret_cast<TSIOBuffer>(iobuf) : TSIOBufferCreate(); - sm->req_reader = reader ? reinterpret_cast<TSIOBufferReader>(reader) : TSIOBufferReaderAlloc(sm->req_buffer); - - sm->resp_buffer = TSIOBufferCreate(); - sm->resp_reader = TSIOBufferReaderAlloc(sm->resp_buffer); - - TSContSchedule(sm->contp, 0, TS_THREAD_POOL_DEFAULT); // schedule now -} - -static int -spdy_main_handler(TSCont contp, TSEvent event, void *edata) -{ - SpdySM *sm; - SpdySMHandler spdy_current_handler; - - sm = (SpdySM*)TSContDataGet(contp); - spdy_current_handler = sm->current_handler; - - return (*spdy_current_handler) (contp, event, edata); -} - -static int -spdy_start_handler(TSCont contp, TSEvent /*event*/, void * /*data*/) -{ - int r; - spdylay_settings_entry entry; - - SpdySM *sm = (SpdySM*)TSContDataGet(contp); - - if (TSIOBufferReaderAvail(sm->req_reader) > 0) { - spdy_process_read(TS_EVENT_VCONN_WRITE_READY, sm); - } - - sm->read_vio = (TSVIO)sm->vc->do_io_read(reinterpret_cast<Continuation *>(contp), INT64_MAX, reinterpret_cast<MIOBuffer *>(sm->req_buffer)); - sm->write_vio = (TSVIO)sm->vc->do_io_write(reinterpret_cast<Continuation *>(contp), INT64_MAX, reinterpret_cast<IOBufferReader *>(sm->resp_reader)); - - sm->current_handler = &spdy_default_handler; - - /* send initial settings frame */ - entry.settings_id = SPDYLAY_SETTINGS_MAX_CONCURRENT_STREAMS; - entry.value = SPDY_CFG.spdy.max_concurrent_streams; - entry.flags = SPDYLAY_ID_FLAG_SETTINGS_NONE; - - r = spdylay_submit_settings(sm->session, SPDYLAY_FLAG_SETTINGS_NONE, &entry, 1); - TSAssert(r == 0); - - TSVIOReenable(sm->write_vio); - return 0; -} - -static int -spdy_default_handler(TSCont contp, TSEvent event, void *edata) -{ - int ret = 0; - bool from_fetch = false; - SpdySM *sm = (SpdySM*)TSContDataGet(contp); - sm->event = event; - - if (edata == sm->read_vio) { - Debug("spdy", "++++[READ EVENT]"); - if (event != TS_EVENT_VCONN_READ_READY && - event != TS_EVENT_VCONN_READ_COMPLETE) { - ret = -1; - goto out; - } - ret = spdy_process_read(event, sm); - } else if (edata == sm->write_vio) { - Debug("spdy", "----[WRITE EVENT]"); - if (event != TS_EVENT_VCONN_WRITE_READY && - event != TS_EVENT_VCONN_WRITE_COMPLETE) { - ret = -1; - goto out; - } - ret = spdy_process_write(event, sm); - } else { - from_fetch = true; - ret = spdy_process_fetch(event, sm, edata); - } - - Debug("spdy-event", "++++SpdySM[%" PRIu64 "], EVENT:%d, ret:%d, nr_pending:%" PRIu64, - sm->sm_id, event, ret, g_sm_cnt); -out: - if (ret) { - sm->clear(); - spdySMAllocator.free(sm); - } else if (!from_fetch) { - sm->vc->set_inactivity_timeout(HRTIME_SECONDS(SPDY_CFG.no_activity_timeout_in)); - } - - return 0; -} - -static int -spdy_process_read(TSEvent /* event ATS_UNUSED */, SpdySM *sm) -{ - return spdylay_session_recv(sm->session); -} - -static int -spdy_process_write(TSEvent /* event ATS_UNUSED */, SpdySM *sm) -{ - int ret; - - ret = spdylay_session_send(sm->session); - - if (TSIOBufferReaderAvail(sm->resp_reader) > 0) - TSVIOReenable(sm->write_vio); - else { - Debug("spdy", "----TOTAL SEND (sm_id:%" PRIu64 ", total_size:%" PRIu64 ", total_send:%" PRId64 ")", - sm->sm_id, sm->total_size, TSVIONDoneGet(sm->write_vio)); - - // - // We should reenable read_vio when no data to be written, - // otherwise it could lead to hang issue when client POST - // data is waiting to be read. - // - TSVIOReenable(sm->read_vio); - } - - return ret; -} - -static int -spdy_process_fetch(TSEvent event, SpdySM *sm, void *edata) -{ - int ret = -1; - TSFetchSM fetch_sm = (TSFetchSM)edata; - SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); - - switch ((int)event) { - - case TS_FETCH_EVENT_EXT_HEAD_DONE: - Debug("spdy", "----[FETCH HEADER DONE]"); - ret = spdy_process_fetch_header(event, sm, fetch_sm); - break; - - case TS_FETCH_EVENT_EXT_BODY_READY: - Debug("spdy", "----[FETCH BODY READY]"); - ret = spdy_process_fetch_body(event, sm, fetch_sm); - break; - - case TS_FETCH_EVENT_EXT_BODY_DONE: - Debug("spdy", "----[FETCH BODY DONE]"); - req->fetch_body_completed = true; - ret = spdy_process_fetch_body(event, sm, fetch_sm); - break; - - default: - Debug("spdy", "----[FETCH ERROR]"); - if (req->fetch_body_completed) - ret = 0; // Ignore fetch errors after FETCH BODY DONE - else - req->fetch_sm = NULL; - break; - } - - if (ret) { - spdy_prepare_status_response(sm, req->stream_id, STATUS_500); - sm->req_map.erase(req->stream_id); - req->clear(); - spdyRequestAllocator.free(req); - } - - return 0; -} - -static int -spdy_process_fetch_header(TSEvent /*event*/, SpdySM *sm, TSFetchSM fetch_sm) -{ - int ret; - SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); - SpdyNV spdy_nv(fetch_sm); - - Debug("spdy", "----spdylay_submit_syn_reply"); - ret = spdylay_submit_syn_reply(sm->session, - SPDYLAY_CTRL_FLAG_NONE, req->stream_id, - spdy_nv.nv); - - TSVIOReenable(sm->write_vio); - return ret; -} - -static ssize_t -spdy_read_fetch_body_callback(spdylay_session * /*session*/, int32_t stream_id, - uint8_t *buf, size_t length, int *eof, - spdylay_data_source *source, void *user_data) -{ - - static int g_call_cnt; - int64_t already; - - SpdySM *sm = (SpdySM *)user_data; - SpdyRequest *req = (SpdyRequest *)source->ptr; - - // - // req has been deleted, ignore this data. - // - if (req != sm->req_map[stream_id]) { - Debug("spdy", " stream_id:%d, call:%d, req has been deleted, return 0", - stream_id, g_call_cnt); - *eof = 1; - return 0; - } - - already = TSFetchReadData(req->fetch_sm, buf, length); - - Debug("spdy", " stream_id:%d, call:%d, length:%ld, already:%" PRId64, - stream_id, g_call_cnt, length, already); - if (SPDY_CFG.spdy.verbose) - MD5_Update(&req->recv_md5, buf, already); - - TSVIOReenable(sm->write_vio); - g_call_cnt++; - - req->fetch_data_len += already; - if (already < (int64_t)length) { - if (req->event == TS_FETCH_EVENT_EXT_BODY_DONE) { - TSHRTime end_time = TShrtime(); - Debug("spdy", "----Request[%" PRIu64 ":%d] %s %lld %d", sm->sm_id, req->stream_id, - req->url.c_str(), (end_time - req->start_time)/TS_HRTIME_MSECOND, - req->fetch_data_len); - unsigned char digest[MD5_DIGEST_LENGTH]; - if (SPDY_CFG.spdy.verbose ) { - MD5_Final(digest, &req->recv_md5); - Debug("spdy", "----recv md5sum: "); - for (int i = 0; i < MD5_DIGEST_LENGTH; i++) { - Debug("spdy", "%02x", digest[i]); - } - } - *eof = 1; - sm->req_map.erase(stream_id); - req->clear(); - spdyRequestAllocator.free(req); - } else if (already == 0) { - req->need_resume_data = true; - return SPDYLAY_ERR_DEFERRED; - } - } - - return already; -} - -static int -spdy_process_fetch_body(TSEvent event, SpdySM *sm, TSFetchSM fetch_sm) -{ - int ret = 0; - spdylay_data_provider data_prd; - SpdyRequest *req = (SpdyRequest *)TSFetchUserDataGet(fetch_sm); - req->event = event; - - data_prd.source.ptr = (void *)req; - data_prd.read_callback = spdy_read_fetch_body_callback; - - if (!req->has_submitted_data) { - req->has_submitted_data = true; - Debug("spdy", "----spdylay_submit_data"); - ret = spdylay_submit_data(sm->session, req->stream_id, - SPDYLAY_DATA_FLAG_FIN, &data_prd); - } else if (req->need_resume_data) { - Debug("spdy", "----spdylay_session_resume_data"); - ret = spdylay_session_resume_data(sm->session, req->stream_id); - if (ret == SPDYLAY_ERR_INVALID_ARGUMENT) - ret = 0; - } - - TSVIOReenable(sm->write_vio); - return ret; -} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdySM.h ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdySM.h b/proxy/spdy/SpdySM.h deleted file mode 100644 index 4a6da83..0000000 --- a/proxy/spdy/SpdySM.h +++ /dev/null @@ -1,145 +0,0 @@ -/** @file - - SpdySM.h - - @section license License - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -#ifndef __P_SPDY_SM_H__ -#define __P_SPDY_SM_H__ - -#include "SpdyCommon.h" -#include "SpdyCallbacks.h" -#include <openssl/md5.h> - -class SpdySM; -typedef int (*SpdySMHandler) (TSCont contp, TSEvent event, void *data); - -class SpdyRequest -{ -public: - SpdyRequest(): - spdy_sm(NULL), stream_id(-1), fetch_sm(NULL), - has_submitted_data(false), need_resume_data(false), - fetch_data_len(0), delta_window_size(0), - fetch_body_completed(false) - { - } - - SpdyRequest(SpdySM *sm, int id): - spdy_sm(NULL), stream_id(-1), fetch_sm(NULL), - has_submitted_data(false), need_resume_data(false), - fetch_data_len(0), delta_window_size(0), - fetch_body_completed(false) - { - init(sm, id); - } - - ~SpdyRequest() - { - clear(); - } - - void init(SpdySM *sm, int id) - { - spdy_sm = sm; - stream_id = id; - headers.clear(); - - MD5_Init(&recv_md5); - start_time = TShrtime(); - } - - void clear(); - - void append_nv(char **nv) - { - for(int i = 0; nv[i]; i += 2) { - headers.push_back(make_pair(nv[i], nv[i+1])); - } - } - -public: - int event; - SpdySM *spdy_sm; - int stream_id; - TSHRTime start_time; - TSFetchSM fetch_sm; - bool has_submitted_data; - bool need_resume_data; - int fetch_data_len; - int delta_window_size; - bool fetch_body_completed; - vector<pair<string, string> > headers; - - string url; - string host; - string path; - string scheme; - string method; - string version; - - MD5_CTX recv_md5; -}; - -class SpdySM -{ - -public: - - SpdySM() { - } - - ~SpdySM() { - clear(); - } - - void init(NetVConnection * netvc); - void clear(); - - int64_t sm_id; - uint64_t total_size; - TSHRTime start_time; - - NetVConnection * vc; - TSCont contp; - - TSIOBuffer req_buffer; - TSIOBufferReader req_reader; - - TSIOBuffer resp_buffer; - TSIOBufferReader resp_reader; - - TSVIO read_vio; - TSVIO write_vio; - - SpdySMHandler current_handler; - - int event; - spdylay_session *session; - - map<int32_t, SpdyRequest*> req_map; -}; - -void spdy_sm_create(NetVConnection * netvc, MIOBuffer * iobuf, IOBufferReader * reader); - -extern ClassAllocator<SpdySM> spdySMAllocator; -extern ClassAllocator<SpdyRequest> spdyRequestAllocator; - -#endif http://git-wip-us.apache.org/repos/asf/trafficserver/blob/92d70eb7/proxy/spdy/SpdySessionAccept.cc ---------------------------------------------------------------------- diff --git a/proxy/spdy/SpdySessionAccept.cc b/proxy/spdy/SpdySessionAccept.cc index 00fd024..79b465d 100644 --- a/proxy/spdy/SpdySessionAccept.cc +++ b/proxy/spdy/SpdySessionAccept.cc @@ -25,7 +25,7 @@ #include "Error.h" #if TS_HAS_SPDY -#include "SpdySM.h" +#include "SpdyClientSession.h" #endif SpdySessionAccept::SpdySessionAccept(Continuation *ep)
