Repository: trafficserver Updated Branches: refs/heads/master 683a377d1 -> e4347ef80
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e4347ef8/proxy/http2/Http2ConnectionState.cc ---------------------------------------------------------------------- diff --git a/proxy/http2/Http2ConnectionState.cc b/proxy/http2/Http2ConnectionState.cc index 5030e36..79a88dc 100644 --- a/proxy/http2/Http2ConnectionState.cc +++ b/proxy/http2/Http2ConnectionState.cc @@ -28,31 +28,299 @@ #define DebugHttp2Ssn(fmt, ...) \ DebugSsn("http2_cs", "[%" PRId64 "] " fmt, this->con_id, __VA_ARGS__) +// Currently use only HTTP/1.1 for requesting to origin server +const static char* HTTP2_FETCHING_HTTP_VERSION = "HTTP/1.1"; + typedef Http2ErrorCode (*http2_frame_dispatch)(Http2ClientSession&, Http2ConnectionState&, const Http2Frame&); static const int buffer_size_index[HTTP2_FRAME_TYPE_MAX] = { - -1, // HTTP2_FRAME_TYPE_DATA - -1, // HTTP2_FRAME_TYPE_HEADERS + BUFFER_SIZE_INDEX_8K, // HTTP2_FRAME_TYPE_DATA + BUFFER_SIZE_INDEX_4K, // HTTP2_FRAME_TYPE_HEADERS -1, // HTTP2_FRAME_TYPE_PRIORITY - -1, // HTTP2_FRAME_TYPE_RST_STREAM + BUFFER_SIZE_INDEX_128, // HTTP2_FRAME_TYPE_RST_STREAM BUFFER_SIZE_INDEX_128, // HTTP2_FRAME_TYPE_SETTINGS -1, // HTTP2_FRAME_TYPE_PUSH_PROMISE - -1, // HTTP2_FRAME_TYPE_PING + BUFFER_SIZE_INDEX_128, // HTTP2_FRAME_TYPE_PING BUFFER_SIZE_INDEX_128, // HTTP2_FRAME_TYPE_GOAWAY - -1, // HTTP2_FRAME_TYPE_WINDOW_UPDATE - -1, // HTTP2_FRAME_TYPE_CONTINUATION - -1, // HTTP2_FRAME_TYPE_ALTSVC - -1, // HTTP2_FRAME_TYPE_BLOCKED + BUFFER_SIZE_INDEX_128, // HTTP2_FRAME_TYPE_WINDOW_UPDATE + BUFFER_SIZE_INDEX_4K, // HTTP2_FRAME_TYPE_CONTINUATION }; +inline static unsigned +read_rcv_buffer(char* buf, size_t bufsize, unsigned& nbytes, const Http2Frame& frame) +{ + char * end; + + if(frame.header().length - nbytes > bufsize) { + end = frame.reader()->memcpy(buf, bufsize, nbytes); + } else { + end = frame.reader()->memcpy(buf, frame.header().length - nbytes, nbytes); + } + nbytes += end - buf; + + return end - buf; +} + +static Http2ErrorCode +rcv_data_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + char buf[BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA])]; + unsigned nbytes = 0; + Http2StreamId id = frame.header().streamid; + Http2Stream* stream = cstate.find_stream(id); + uint8_t pad_length = 0; + const uint32_t payload_length = frame.header().length; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received DATA frame.", cs.connection_id()); + + // If a DATA frame is received whose stream identifier field is 0x0, the recipient MUST + // respond with a connection error of type PROTOCOL_ERROR. + if (id == 0 || stream == NULL) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // If a DATA frame is received whose stream is not in "open" or "half closed (local)" state, + // the recipient MUST respond with a stream error of type STREAM_CLOSED. + if (stream->get_state() != HTTP2_STREAM_STATE_OPEN && + stream->get_state() != HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL) { + cstate.send_rst_stream_frame(id, HTTP2_ERROR_STREAM_CLOSED); + return HTTP2_ERROR_NO_ERROR; + } + + stream->increment_data_length(payload_length); + if (frame.header().flags & HTTP2_FLAGS_DATA_END_STREAM) { + if (!stream->change_state(frame.header().type, frame.header().flags)) { + cstate.send_rst_stream_frame(id, HTTP2_ERROR_STREAM_CLOSED); + return HTTP2_ERROR_NO_ERROR; + } + if (!stream->payload_length_is_valid()) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + } + + if (frame.header().flags & HTTP2_FLAGS_DATA_PADDED) { + frame.reader()->memcpy(&pad_length, HTTP2_DATA_PADLEN_LEN, nbytes); + + if (pad_length > payload_length) { + // If the length of the padding is the length of the + // frame payload or greater, the recipient MUST treat this as a + // connection error of type PROTOCOL_ERROR. + return HTTP2_ERROR_PROTOCOL_ERROR; + } + } + + // If Data length is 0, do nothing. + if (payload_length == 0 ) { + return HTTP2_ERROR_NO_ERROR; + } + + // Check whether Window Size is appeptable. + if (cstate.server_rwnd < payload_length || + stream->server_rwnd < payload_length) { + return HTTP2_ERROR_FLOW_CONTROL_ERROR; + } + + // Update Window size + cstate.server_rwnd -= payload_length; + stream->server_rwnd -= payload_length; + + const uint32_t unpadded_length = payload_length - pad_length; + while (nbytes < payload_length - pad_length) { + size_t read_len = sizeof(buf); + if (nbytes + read_len > unpadded_length) read_len -= nbytes + read_len - unpadded_length; + unsigned read_bytes = read_rcv_buffer(buf, read_len, nbytes, frame); + stream->set_body_to_fetcher(buf, read_bytes); + } + + uint32_t initial_rwnd = cstate.server_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE); + uint32_t min_rwnd = min(initial_rwnd, cstate.server_settings.get(HTTP2_SETTINGS_MAX_FRAME_SIZE)); + // Connection level WINDOW UPDATE + if (cstate.server_rwnd <= min_rwnd) { + Http2WindowSize diff_size = initial_rwnd - cstate.server_rwnd; + cstate.server_rwnd += diff_size; + cstate.send_window_update_frame(0, diff_size); + } + // Stream level WINDOW UPDATE + if(stream->server_rwnd <= min_rwnd) { + Http2WindowSize diff_size = initial_rwnd - stream->server_rwnd; + stream->server_rwnd += diff_size; + cstate.send_window_update_frame(stream->get_id(), diff_size); + } + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_headers_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + char buf[BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_HEADERS])]; + unsigned nbytes = 0; + Http2StreamId id = frame.header().streamid; + Http2HeadersParameter params; + const uint32_t payload_length = frame.header().length; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received HEADERS frame.", cs.connection_id()); + + if (!http2_is_client_streamid(id)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // Create new stream + Http2Stream* stream = cstate.create_stream(id); + if (!stream) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // A receiver MUST treat the receipt of any other type of frame or + // a frame on a different stream as a connection error of type PROTOCOL_ERROR. + if (cstate.get_continued_id() != 0) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // Change state. If changing is invalid, raise PROTOCOL_ERROR + if (!stream->change_state(frame.header().type, frame.header().flags)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // Check whether padding exists or not. + if (frame.header().flags & HTTP2_FLAGS_HEADERS_PADDED) { + frame.reader()->memcpy(buf, HTTP2_HEADERS_PADLEN_LEN, nbytes); + nbytes += HTTP2_HEADERS_PADLEN_LEN; + if (!http2_parse_headers_parameter(make_iovec(buf, HTTP2_HEADERS_PADLEN_LEN), params)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + if (params.pad_length > payload_length) { + // If the length of the padding is the length of the + // frame payload or greater, the recipient MUST treat this as a + // connection error of type PROTOCOL_ERROR. + return HTTP2_ERROR_PROTOCOL_ERROR; + } + } else { + params.pad_length = 0; + } + + // Check whether parameters of priority exist or not. + // TODO Currently priority is NOT supported. + if (frame.header().flags & HTTP2_FLAGS_HEADERS_PRIORITY) { + frame.reader()->memcpy(buf, HTTP2_PRIORITY_LEN, nbytes); + nbytes += HTTP2_PRIORITY_LEN; + if (!http2_parse_priority_parameter(make_iovec(buf, HTTP2_PRIORITY_LEN), params.priority)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + } + + // Parse request headers encoded by HPACK + const uint32_t unpadded_length = payload_length - params.pad_length; + uint32_t remaining_bytes = 0; + for(;;) { + size_t read_len = sizeof(buf) - remaining_bytes; + if (nbytes + read_len > unpadded_length) read_len -= nbytes + read_len - unpadded_length; + unsigned read_bytes = read_rcv_buffer(buf + remaining_bytes, read_len, nbytes, frame); + IOVec header_block_fragment = make_iovec(buf, read_bytes + remaining_bytes); + + bool cont = nbytes < payload_length || !(frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS); + int64_t decoded_bytes = stream->decode_request_header(header_block_fragment, *cstate.local_dynamic_table, cont); + + // 4.3. A receiver MUST terminate the connection with a + // connection error of type COMPRESSION_ERROR if it does + // not decompress a header block. + if (decoded_bytes == 0 || decoded_bytes == HPACK_ERROR_COMPRESSION_ERROR) { + return HTTP2_ERROR_COMPRESSION_ERROR; + } + + if (decoded_bytes == HPACK_ERROR_HTTP2_PROTOCOL_ERROR) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + remaining_bytes = header_block_fragment.iov_len - decoded_bytes; + memmove(buf, buf + header_block_fragment.iov_len - remaining_bytes, remaining_bytes); + + if (nbytes >= payload_length - params.pad_length) { + if (!(frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS)) { + cstate.set_continued_headers(buf, remaining_bytes, id); + } + break; + } + } + + // backposting + if (frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS) { + stream->init_fetcher(cstate); + } + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_priority_frame(Http2ClientSession& cs, Http2ConnectionState& /*cstate*/, const Http2Frame& frame) +{ + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] received PRIORITY frame", cs.connection_id()); + + // If a PRIORITY frame is received with a stream identifier of 0x0, the + // recipient MUST respond with a connection error of type PROTOCOL_ERROR. + if (frame.header().streamid == 0) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // A PRIORITY frame with a length other than 5 octets MUST be treated as + // a stream error (Section 5.4.2) of type FRAME_SIZE_ERROR. + if (frame.header().length != HTTP2_PRIORITY_LEN) { + return HTTP2_ERROR_FRAME_SIZE_ERROR; + } + + // TODO Pick stream dependencies and weight + // Supporting PRIORITY is not essential so its temporarily ignored. + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_rst_stream_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + Http2RstStream rst_stream; + char buf[HTTP2_RST_STREAM_LEN]; + char * end; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received RST_STREAM frame.", cs.connection_id()); + + Http2Stream* stream = cstate.find_stream(frame.header().streamid); + if (stream == NULL) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + if (frame.header().length != HTTP2_RST_STREAM_LEN) { + return HTTP2_ERROR_FRAME_SIZE_ERROR; + } + + if (!stream->change_state(frame.header().type, frame.header().flags)) { + // If a RST_STREAM frame identifying an idle stream is received, the + // recipient MUST treat this as a connection error of type PROTOCOL_ERROR. + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + cstate.delete_stream(stream); + end = frame.reader()->memcpy(buf, sizeof(buf), 0); + + if (!http2_parse_rst_stream(make_iovec(buf, end - buf), rst_stream)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] RST_STREAM: Stream ID: %u, Error Code: %u)", + cs.connection_id(), stream->get_id(), rst_stream.error_code); + + return HTTP2_ERROR_NO_ERROR; +} + static Http2ErrorCode rcv_settings_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) { Http2SettingsParameter param; char buf[HTTP2_SETTINGS_PARAMETER_LEN]; unsigned nbytes = 0; - char * end; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received SETTINGS frame.", cs.connection_id()); // 6.5 The stream identifier for a SETTINGS frame MUST be zero. if (frame.header().streamid != 0) { @@ -67,10 +335,9 @@ rcv_settings_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const H } while (nbytes < frame.header().length) { - end = frame.reader()->memcpy(buf, sizeof(buf), nbytes); - nbytes += (end - buf); + unsigned read_bytes = read_rcv_buffer(buf, sizeof(buf), nbytes, frame); - if (!http2_parse_settings_parameter(make_iovec(buf, end - buf), param)) { + if (!http2_parse_settings_parameter(make_iovec(buf, read_bytes), param)) { return HTTP2_ERROR_PROTOCOL_ERROR; } @@ -82,6 +349,14 @@ rcv_settings_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const H DebugSsn(&cs, "http2_cs", "[%" PRId64 "] setting param=%d value=%u", cs.connection_id(), param.id, param.value); + // 6.9.2. When the value of SETTINGS_INITIAL_WINDOW_SIZE + // changes, a receiver MUST adjust the size of all stream flow control + // windows that it maintains by the difference between the new value and + // the old value. + if (param.id == HTTP2_SETTINGS_INITIAL_WINDOW_SIZE) { + cstate.update_initial_rwnd(param.value); + } + cstate.client_settings.set((Http2SettingsIdentifier)param.id, param.value); } @@ -93,26 +368,235 @@ rcv_settings_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const H return HTTP2_ERROR_NO_ERROR; } +static Http2ErrorCode +rcv_push_promise_frame(Http2ClientSession& cs, Http2ConnectionState& /*cstate*/, const Http2Frame& /*frame*/) +{ + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] received PUSH_PROMISE frame", cs.connection_id()); + + // 8.2. A client cannot push. Thus, servers MUST treat the receipt of a + // PUSH_PROMISE frame as a connection error of type PROTOCOL_ERROR. + return HTTP2_ERROR_PROTOCOL_ERROR; +} + +// 6.7. PING +static Http2ErrorCode +rcv_ping_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + uint8_t opaque_data[HTTP2_PING_LEN]; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received PING frame.", cs.connection_id()); + + // If a PING frame is received with a stream identifier field value other than + // 0x0, the recipient MUST respond with a connection error of type PROTOCOL_ERROR. + if (frame.header().streamid != 0x0) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // Receipt of a PING frame with a length field value other than 8 MUST + // be treated as a connection error (Section 5.4.1) of type FRAME_SIZE_ERROR. + if (frame.header().length != HTTP2_PING_LEN) { + return HTTP2_ERROR_FRAME_SIZE_ERROR; + } + + // An endpoint MUST NOT respond to PING frames containing this flag. + if (frame.header().flags & HTTP2_FLAGS_PING_ACK) { + return HTTP2_ERROR_NO_ERROR; + } + + frame.reader()->memcpy(opaque_data, HTTP2_PING_LEN, 0); + + // ACK (0x1): An endpoint MUST set this flag in PING responses. + cstate.send_ping_frame(frame.header().streamid, HTTP2_FLAGS_PING_ACK, opaque_data); + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_goaway_frame(Http2ClientSession& cs, Http2ConnectionState& /*cstate*/, const Http2Frame& frame) +{ + Http2Goaway goaway; + char buf[HTTP2_GOAWAY_LEN]; + unsigned nbytes = 0; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] received GOAWAY frame", cs.connection_id()); + + // An endpoint MUST treat a GOAWAY frame with a stream identifier other + // than 0x0 as a connection error of type PROTOCOL_ERROR. + if (frame.header().streamid != 0x0) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + while (nbytes < frame.header().length) { + unsigned read_bytes = read_rcv_buffer(buf, sizeof(buf), nbytes, frame); + + if (!http2_parse_goaway(make_iovec(buf, read_bytes), goaway)) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + } + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] GOAWAY: last stream id=%d, error code=%d.", + cs.connection_id(), goaway.last_streamid, goaway.error_code); + + // eventProcessor.schedule_imm(&cs, ET_NET, VC_EVENT_ERROR); + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_window_update_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + char buf[HTTP2_WINDOW_UPDATE_LEN]; + uint32_t size; + Http2StreamId sid = frame.header().streamid; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received WINDOW_UPDATE frame.", cs.connection_id()); + + // A WINDOW_UPDATE frame with a length other than 4 octets MUST be + // treated as a connection error of type FRAME_SIZE_ERROR. + if (frame.header().length != HTTP2_WINDOW_UPDATE_LEN) { + return HTTP2_ERROR_FRAME_SIZE_ERROR; + } + + if (sid == 0) { + // Connection level window update + frame.reader()->memcpy(buf, sizeof(buf), 0); + http2_parse_window_update(make_iovec(buf, sizeof(buf)), size); + + // A receiver MUST treat the receipt of a WINDOW_UPDATE frame with a connection + // flow control window increment of 0 as a connection error of type PROTOCOL_ERROR; + if (size == 0) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + cstate.client_rwnd += size; + cstate.restart_streams(); + } else { + // Stream level window update + Http2Stream* stream = cstate.find_stream(sid); + + // This means that a receiver could receive a + // WINDOW_UPDATE frame on a "half closed (remote)" or "closed" stream. + // A receiver MUST NOT treat this as an error. + if (stream == NULL) { + // Temporarily ignore WINDOW_UPDATE + // TODO After supporting PRIORITY, it should be handled correctly. + return HTTP2_ERROR_NO_ERROR; + } + + frame.reader()->memcpy(buf, sizeof(buf), 0); + http2_parse_window_update(make_iovec(buf, sizeof(buf)), size); + + // A receiver MUST treat the receipt of a WINDOW_UPDATE frame with an + // flow control window increment of 0 as a stream error of type PROTOCOL_ERROR; + if (size == 0) { + cstate.send_rst_stream_frame(sid, HTTP2_ERROR_PROTOCOL_ERROR); + return HTTP2_ERROR_NO_ERROR; + } + + stream->client_rwnd += size; + ssize_t wnd = min(cstate.client_rwnd, stream->client_rwnd); + if (wnd > 0) { + cstate.send_data_frame(stream->get_fetcher()); + } + } + + return HTTP2_ERROR_NO_ERROR; +} + +static Http2ErrorCode +rcv_continuation_frame(Http2ClientSession& cs, Http2ConnectionState& cstate, const Http2Frame& frame) +{ + char buf[BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_CONTINUATION])]; + unsigned nbytes = 0; + const Http2StreamId stream_id = frame.header().streamid; + + DebugSsn(&cs, "http2_cs", "[%" PRId64 "] Received CONTINUATION frame.", cs.connection_id()); + + // Find opened stream + Http2Stream* stream = cstate.find_stream(stream_id); + if (stream == NULL) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or + // CONTINUATION frame without the END_HEADERS flag set. + if (stream->get_state() != HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE && + stream->get_state() != HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + // A receiver MUST treat the receipt of any other type of frame or + // a frame on a different stream as a connection error of type PROTOCOL_ERROR. + if (stream->get_id() != cstate.get_continued_id()) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + const IOVec remaining_data = cstate.get_continued_headers(); + uint32_t remaining_bytes = remaining_data.iov_len; + if (remaining_bytes && remaining_data.iov_base) { + memcpy(buf, remaining_data.iov_base, remaining_data.iov_len); + } + + // Parse request headers encoded by HPACK + for(;;) { + unsigned read_bytes = read_rcv_buffer(buf + remaining_bytes, sizeof(buf) - remaining_bytes, nbytes, frame); + IOVec header_block_fragment = make_iovec(buf, read_bytes + remaining_bytes); + + bool cont = nbytes < frame.header().length || !(frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS); + int64_t decoded_bytes = stream->decode_request_header(header_block_fragment, *cstate.local_dynamic_table, cont); + + // A receiver MUST terminate the connection with a + // connection error of type COMPRESSION_ERROR if it does + // not decompress a header block. + if (decoded_bytes == 0 || decoded_bytes == HPACK_ERROR_COMPRESSION_ERROR) { + return HTTP2_ERROR_COMPRESSION_ERROR; + } + + if (decoded_bytes == HPACK_ERROR_HTTP2_PROTOCOL_ERROR) { + return HTTP2_ERROR_PROTOCOL_ERROR; + } + + remaining_bytes = header_block_fragment.iov_len - decoded_bytes; + memmove(buf, buf + header_block_fragment.iov_len - remaining_bytes, remaining_bytes); + + if (nbytes >= frame.header().length) { + if (!(frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS)) { + cstate.set_continued_headers(buf, remaining_bytes, stream_id); + } + break; + } + } + + // backposting + if (frame.header().flags & HTTP2_FLAGS_HEADERS_END_HEADERS) { + cstate.finish_continued_headers(); + stream->init_fetcher(cstate); + } + + return HTTP2_ERROR_NO_ERROR; +} + static const http2_frame_dispatch frame_handlers[HTTP2_FRAME_TYPE_MAX] = { - NULL, // HTTP2_FRAME_TYPE_DATA - NULL, // HTTP2_FRAME_TYPE_HEADERS - NULL, // HTTP2_FRAME_TYPE_PRIORITY - NULL, // HTTP2_FRAME_TYPE_RST_STREAM - rcv_settings_frame, // HTTP2_FRAME_TYPE_SETTINGS - NULL, // HTTP2_FRAME_TYPE_PUSH_PROMISE - NULL, // HTTP2_FRAME_TYPE_PING - NULL, // HTTP2_FRAME_TYPE_GOAWAY - NULL, // HTTP2_FRAME_TYPE_WINDOW_UPDATE - NULL, // HTTP2_FRAME_TYPE_CONTINUATION - NULL, // HTTP2_FRAME_TYPE_ALTSVC - NULL, // HTTP2_FRAME_TYPE_BLOCKED + rcv_data_frame, // HTTP2_FRAME_TYPE_DATA + rcv_headers_frame, // HTTP2_FRAME_TYPE_HEADERS + rcv_priority_frame, // HTTP2_FRAME_TYPE_PRIORITY + rcv_rst_stream_frame, // HTTP2_FRAME_TYPE_RST_STREAM + rcv_settings_frame, // HTTP2_FRAME_TYPE_SETTINGS + rcv_push_promise_frame, // HTTP2_FRAME_TYPE_PUSH_PROMISE + rcv_ping_frame, // HTTP2_FRAME_TYPE_PING + rcv_goaway_frame, // HTTP2_FRAME_TYPE_GOAWAY + rcv_window_update_frame, // HTTP2_FRAME_TYPE_WINDOW_UPDATE + rcv_continuation_frame, // HTTP2_FRAME_TYPE_CONTINUATION }; int Http2ConnectionState::main_event_handler(int event, void * edata) { - if (event == HTTP2_SESSION_EVENT_INIT) { + switch (event) { + + // Initialize HTTP/2 Connection + case HTTP2_SESSION_EVENT_INIT: { ink_assert(this->ua_session == NULL); this->ua_session = (Http2ClientSession *)edata; @@ -121,23 +605,45 @@ Http2ConnectionState::main_event_handler(int event, void * edata) // send a connection preface as a final confirmation ... The server connection // preface consists of a potentially empty SETTINGS frame. Http2Frame settings(HTTP2_FRAME_TYPE_SETTINGS, 0, 0); + settings.alloc(buffer_size_index[HTTP2_FRAME_TYPE_SETTINGS]); + + // Send all settings values + IOVec iov = settings.write(); + for (int i=1; i<HTTP2_SETTINGS_MAX; i++) { + Http2SettingsIdentifier id = static_cast<Http2SettingsIdentifier>(i); + Http2SettingsParameter param; + param.id = id; + param.value = server_settings.get(id); + iov.iov_base = reinterpret_cast<char*>(iov.iov_base) + HTTP2_SETTINGS_PARAMETER_LEN * (i - 1); + iov.iov_len = HTTP2_SETTINGS_PARAMETER_LEN; + http2_write_settings(param, iov); + } + + settings.finalize(HTTP2_SETTINGS_PARAMETER_LEN * (HTTP2_SETTINGS_MAX-1)); this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &settings); return 0; } - if (event == HTTP2_SESSION_EVENT_FINI) { + // Finalize HTTP/2 Connection + case HTTP2_SESSION_EVENT_FINI: { this->ua_session = NULL; + cleanup_streams(); SET_HANDLER(&Http2ConnectionState::state_closed); return 0; } - if (event == HTTP2_SESSION_EVENT_RECV) { + // Parse received HTTP/2 frames + case HTTP2_SESSION_EVENT_RECV: { Http2Frame * frame = (Http2Frame *)edata; + Http2StreamId last_streamid = frame->header().streamid; Http2ErrorCode error; - // The session layer should have validated the frame already. + // Implementations MUST ignore and discard any frame that has a type that is unknown. ink_assert(frame->header().type < countof(frame_handlers)); + if (frame->header().type > countof(frame_handlers)) { + return 0; + } if (frame_handlers[frame->header().type]) { error = frame_handlers[frame->header().type](*this->ua_session, *this, *frame); @@ -146,18 +652,7 @@ Http2ConnectionState::main_event_handler(int event, void * edata) } if (error != HTTP2_ERROR_NO_ERROR) { - Http2Frame frame(HTTP2_FRAME_TYPE_GOAWAY, 0, 0); - Http2Goaway goaway; - - goaway.last_streamid = 0; - goaway.error_code = error; - - frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_GOAWAY]); - http2_write_goaway(goaway, frame.write()); - frame.finalize(HTTP2_GOAWAY_LEN); - - this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &frame); - eventProcessor.schedule_imm(this->ua_session, ET_NET, VC_EVENT_ERROR); + this->send_goaway_frame(last_streamid, error); // XXX We need to think a bit harder about how to coordinate the client session and the // protocol connection. At this point, the protocol is shutting down, but there's no way @@ -169,6 +664,35 @@ Http2ConnectionState::main_event_handler(int event, void * edata) return 0; } + // Process response headers from origin server + case TS_FETCH_EVENT_EXT_HEAD_DONE: { + FetchSM* fetch_sm = reinterpret_cast<FetchSM*>(edata); + this->send_headers_frame(fetch_sm); + return 0; + } + + // Process a part of response body from origin server + case TS_FETCH_EVENT_EXT_BODY_READY: { + FetchSM* fetch_sm = reinterpret_cast<FetchSM*>(edata); + this->send_data_frame(fetch_sm); + return 0; + } + + // Process final part of response body from origin server + case TS_FETCH_EVENT_EXT_BODY_DONE: { + FetchSM* fetch_sm = reinterpret_cast<FetchSM*>(edata); + Http2Stream* stream = static_cast<Http2Stream*>(fetch_sm->ext_get_user_data()); + stream->mark_body_done(); + this->send_data_frame(fetch_sm); + return 0; + } + + default: + DebugSsn(this->ua_session, "http2_cs", "unexpected event=%d edata=%p", event, edata); + ink_release_assert(0); + return 0; + } + return 0; } @@ -177,3 +701,414 @@ Http2ConnectionState::state_closed(int /* event */, void * /* edata */) { return 0; } + +Http2Stream* +Http2ConnectionState::create_stream(Http2StreamId new_id) +{ + // The identifier of a newly established stream MUST be numerically + // greater than all streams that the initiating endpoint has opened or + // reserved. + if (new_id <= latest_streamid) { + return NULL; + } + + // Endpoints MUST NOT exceed the limit set by their peer. An endpoint + // that receives a HEADERS frame that causes their advertised concurrent + // stream limit to be exceeded MUST treat this as a stream error. + if (new_id >= client_settings.get(HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)) { + return NULL; + } + + Http2Stream* new_stream = new Http2Stream(new_id, client_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE)); + stream_list.push(new_stream); + latest_streamid = new_id; + + return new_stream; +} + +Http2Stream* +Http2ConnectionState::find_stream(Http2StreamId id) const +{ + for (Http2Stream* s = stream_list.head; s; s = s->link.next) { + if (s->get_id() == id) + return s; + } + return NULL; +} + +void +Http2ConnectionState::restart_streams() +{ + // Currently lookup retryable streams sequentially. + // TODO considering to stream weight and dependencies. + Http2Stream * s = stream_list.head; + while (s) { + Http2Stream * next = s->link.next; + if (min(this->client_rwnd, s->client_rwnd) > 0) { + this->send_data_frame(s->get_fetcher()); + } + s = next; + } +} + +void +Http2ConnectionState::cleanup_streams() +{ + Http2Stream * s = stream_list.head; + while (s) { + Http2Stream * next = s->link.next; + stream_list.remove(s); + delete s; + s = next; + } +} + +void +Http2ConnectionState::set_continued_headers(const char * buf, uint32_t len, Http2StreamId id) +{ + if (buf && len > 0) { + if (!continued_buffer.iov_base) { + continued_buffer.iov_base = static_cast<uint8_t *>(ats_malloc(len)); + } else if (continued_buffer.iov_len < len) { + continued_buffer.iov_base = ats_realloc(continued_buffer.iov_base, len); + } + continued_buffer.iov_len = len; + + memcpy(continued_buffer.iov_base, buf, continued_buffer.iov_len); + } + + continued_id = id; +} + +void +Http2ConnectionState::finish_continued_headers() +{ + continued_id = 0; + ats_free(continued_buffer.iov_base); + continued_buffer.iov_len = 0; +} + +void +Http2ConnectionState::delete_stream(Http2Stream* stream) +{ + stream_list.remove(stream); + delete stream; +} + +void +Http2ConnectionState::update_initial_rwnd(Http2WindowSize new_size) +{ + // Update stream level window sizes + for (Http2Stream* s = stream_list.head; s; s = s->link.next) { + s->client_rwnd = new_size - (client_settings.get(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE) - s->client_rwnd); + } +} + +void +Http2ConnectionState::send_data_frame(FetchSM *fetch_sm) +{ + size_t buf_len = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_DATA]) - HTTP2_FRAME_HEADER_LEN; + uint8_t payload_buffer[buf_len]; + + DebugSsn(this->ua_session, "http2_cs", "[%" PRId64 "] Send DATA frame.", this->ua_session->connection_id()); + + Http2Stream* stream = static_cast<Http2Stream*>(fetch_sm->ext_get_user_data()); + + for (;;) { + uint8_t flags = 0x00; + + // Select appropriate payload size + if (this->client_rwnd <= 0 || stream->client_rwnd <= 0) break; + size_t window_size = min(this->client_rwnd, stream->client_rwnd); + size_t send_size = min(buf_len, window_size); + + size_t payload_length = fetch_sm->ext_read_data(reinterpret_cast<char*>(payload_buffer), send_size); + if (payload_length == 0) break; + + // Update window size + this->client_rwnd -= payload_length; + stream->client_rwnd -= payload_length; + + if (stream->is_body_done() && payload_length < send_size) { + flags |= HTTP2_FLAGS_DATA_END_STREAM; + } + + // Create frame + Http2Frame data(HTTP2_FRAME_TYPE_DATA, stream->get_id(), flags); + data.alloc(buffer_size_index[HTTP2_FRAME_TYPE_DATA]); + http2_write_data(payload_buffer, payload_length, data.write()); + data.finalize(payload_length); + + // Change state to 'closed' if its end of DATAs. + if (flags & HTTP2_FLAGS_DATA_END_STREAM) { + if (!stream->change_state(data.header().type, data.header().flags)) { + this->send_goaway_frame(stream->get_id(), HTTP2_ERROR_PROTOCOL_ERROR); + } + } + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &data); + + if (flags & HTTP2_FLAGS_DATA_END_STREAM) { + // Delete a stream immediately + // TODO its should not be deleted for a several time to handling RST_STREAM and WINDOW_UPDATE. + // See 'closed' state written at https://tools.ietf.org/html/draft-ietf-httpbis-http2-16#section-5.1 + this->delete_stream(stream); + break; + } + } + +} + +void +Http2ConnectionState::send_headers_frame(FetchSM *fetch_sm) +{ + const size_t buf_len = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_HEADERS]) - HTTP2_FRAME_HEADER_LEN; + uint8_t payload_buffer[buf_len]; + size_t payload_length = 0; + uint8_t flags = 0x00; + + Http2Stream* stream = static_cast<Http2Stream*>(fetch_sm->ext_get_user_data()); + HTTPHdr* resp_header = reinterpret_cast<HTTPHdr*>(fetch_sm->resp_hdr_bufp()); + + // Convert header fields to HTTP/2 format + convert_headers_from_1_1_to_2(resp_header); + + // Write psuedo headers + payload_length += http2_write_psuedo_headers(resp_header, payload_buffer, + buf_len, *(this->remote_dynamic_table)); + + // If response body is empry, set END_STREAM flag to HEADERS frame + if (resp_header->get_content_length() == 0) { + flags |= HTTP2_FLAGS_HEADERS_END_STREAM; + } + + MIMEFieldIter field_iter; + field_iter.m_block = NULL; + bool cont = false; + do { + // Handle first sending frame is as HEADERS + Http2FrameType type = cont ? HTTP2_FRAME_TYPE_CONTINUATION : HTTP2_FRAME_TYPE_HEADERS; + + // Encode by HPACK naive + payload_length += http2_write_header_fragment(resp_header, field_iter, payload_buffer + payload_length, + buf_len - payload_length, *(this->remote_dynamic_table), cont); + + // If buffer size is enough to send rest of headers, set END_HEADERS flag + if (buf_len >= payload_length && !cont) { + flags |= HTTP2_FLAGS_HEADERS_END_HEADERS; + } + + // Create HEADERS or CONTINUATION frame + Http2Frame headers(type, stream->get_id(), flags); + headers.alloc(buffer_size_index[type]); + http2_write_headers(payload_buffer, payload_length, headers.write()); + headers.finalize(payload_length); + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &headers); + } while (cont); +} + +void +Http2ConnectionState::send_rst_stream_frame(Http2StreamId id, Http2ErrorCode ec) +{ + Http2Frame rst_stream(HTTP2_FRAME_TYPE_RST_STREAM, id, 0); + + rst_stream.alloc(buffer_size_index[HTTP2_FRAME_TYPE_RST_STREAM]); + http2_write_rst_stream(static_cast<uint32_t>(ec), rst_stream.write()); + rst_stream.finalize(HTTP2_RST_STREAM_LEN); + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &rst_stream); +} + +void +Http2ConnectionState::send_ping_frame(Http2StreamId id, uint8_t flag, const uint8_t * opaque_data) +{ + Http2Frame ping(HTTP2_FRAME_TYPE_PING, id, flag); + + ping.alloc(buffer_size_index[HTTP2_FRAME_TYPE_PING]); + http2_write_ping(opaque_data, ping.write()); + ping.finalize(HTTP2_PING_LEN); + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &ping); +} + +void +Http2ConnectionState::send_goaway_frame(Http2StreamId id, Http2ErrorCode ec) +{ + Http2Frame frame(HTTP2_FRAME_TYPE_GOAWAY, 0, 0); + Http2Goaway goaway; + + ink_assert(this->ua_session != NULL); + + goaway.last_streamid = id; + goaway.error_code = ec; + + frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_GOAWAY]); + http2_write_goaway(goaway, frame.write()); + frame.finalize(HTTP2_GOAWAY_LEN); + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &frame); +} + +void +Http2ConnectionState::send_window_update_frame(Http2StreamId id, uint32_t size) +{ + // Create WINDOW_UPDATE frame + Http2Frame window_update(HTTP2_FRAME_TYPE_WINDOW_UPDATE, id, 0x0); + window_update.alloc(buffer_size_index[HTTP2_FRAME_TYPE_WINDOW_UPDATE]); + http2_write_window_update(static_cast<uint32_t>(size), window_update.write()); + window_update.finalize(sizeof(uint32_t)); + + // xmit event + MUTEX_LOCK(lock, this->ua_session->mutex, this_ethread()); + this->ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &window_update); +} + +void +Http2Stream::init_fetcher(Http2ConnectionState& cstate) +{ + extern ClassAllocator<FetchSM> FetchSMAllocator; + + // Convert header to HTTP/1.1 format + convert_from_2_to_1_1_header(&_req_header); + + // Get null-terminated URL and method + Arena arena; + int url_len, method_len; + const char* url_ref = _req_header.url_get()->string_get_ref(&url_len); + const char* url = arena.str_store(url_ref, url_len); + const char* method_ref = _req_header.method_get(&method_len); + const char* method = arena.str_store(method_ref, method_len); + + // Initialize FetchSM + _fetch_sm = FetchSMAllocator.alloc(); + _fetch_sm->ext_init((Continuation*)cstate.ua_session, + method, url, HTTP2_FETCHING_HTTP_VERSION, + cstate.ua_session->get_client_addr(), TS_FETCH_FLAGS_DECHUNK); + + // Set request header + MIMEFieldIter fiter; + for (const MIMEField* field = _req_header.iter_get_first(&fiter); field != NULL; field = _req_header.iter_get_next(&fiter)) { + int name_len, value_len; + const char* name = field->name_get(&name_len); + const char* value = field->value_get(&value_len); + + _fetch_sm->ext_add_header(name, name_len, value, value_len); + } + + _fetch_sm->ext_set_user_data(this); + _fetch_sm->ext_launch(); +} + +void +Http2Stream::set_body_to_fetcher(const void *data, size_t len) +{ + ink_assert(_fetch_sm != NULL); + + _fetch_sm->ext_write_data(data, len); +} + +/* + * 5.1. Stream States + * + * +--------+ + * PP | | PP + * ,--------| idle |--------. + * / | | \ + * v +--------+ v + * +----------+ | +----------+ + * | | | H | | + * ,---| reserved | | | reserved |---. + * | | (local) | v | (remote) | | + * | +----------+ +--------+ +----------+ | + * | | ES | | ES | | + * | | H ,-------| open |-------. | H | + * | | / | | \ | | + * | v v +--------+ v v | + * | +----------+ | +----------+ | + * | | half | | | half | | + * | | closed | | R | closed | | + * | | (remote) | | | (local) | | + * | +----------+ | +----------+ | + * | | v | | + * | | ES / R +--------+ ES / R | | + * | `----------->| |<-----------' | + * | R | closed | R | + * `-------------------->| |<--------------------' + * +--------+ + */ +bool +Http2Stream::change_state(uint8_t type, uint8_t flags) +{ + switch(_state) { + case HTTP2_STREAM_STATE_IDLE: + if (type == HTTP2_FRAME_TYPE_HEADERS) { + if (flags & HTTP2_FLAGS_HEADERS_END_STREAM) { + // Skip OPEN _state + _state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE; + } else { + _state = HTTP2_STREAM_STATE_OPEN; + } + } else if (type == HTTP2_FRAME_TYPE_PUSH_PROMISE) { + // XXX Server Push have been supported yet. + } else { + return false; + } + break; + + case HTTP2_STREAM_STATE_OPEN: + if (type == HTTP2_FRAME_TYPE_RST_STREAM) { + _state = HTTP2_STREAM_STATE_CLOSED; + } else if (type == HTTP2_FRAME_TYPE_DATA && + flags & HTTP2_FLAGS_DATA_END_STREAM) { + _state = HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE; + } else { + // Currently ATS supports only HTTP/2 server features + return false; + } + break; + + case HTTP2_STREAM_STATE_RESERVED_LOCAL: + // Currently ATS supports only HTTP/2 server features + return false; + + case HTTP2_STREAM_STATE_RESERVED_REMOTE: + // XXX Server Push have been supported yet. + return false; + + case HTTP2_STREAM_STATE_HALF_CLOSED_LOCAL: + // Currently ATS supports only HTTP/2 server features + return false; + + case HTTP2_STREAM_STATE_HALF_CLOSED_REMOTE: + if ( type == HTTP2_FRAME_TYPE_RST_STREAM || + (type == HTTP2_FRAME_TYPE_HEADERS && + flags & HTTP2_FLAGS_HEADERS_END_STREAM) || + (type == HTTP2_FRAME_TYPE_DATA && + flags & HTTP2_FLAGS_DATA_END_STREAM)) { + _state = HTTP2_STREAM_STATE_CLOSED; + } else { + return false; + } + break; + + case HTTP2_STREAM_STATE_CLOSED: + // No state changing + return false; + + default: + return false; + } + + return true; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e4347ef8/proxy/http2/Http2ConnectionState.h ---------------------------------------------------------------------- diff --git a/proxy/http2/Http2ConnectionState.h b/proxy/http2/Http2ConnectionState.h index 3dc74a2..fdb7ead 100644 --- a/proxy/http2/Http2ConnectionState.h +++ b/proxy/http2/Http2ConnectionState.h @@ -25,17 +25,41 @@ #define __HTTP2_CONNECTION_STATE_H__ #include "HTTP2.h" +#include "HPACK.h" +#include "FetchSM.h" class Http2ClientSession; class Http2ConnectionSettings { public: + + Http2ConnectionSettings() { + // 6.5.2. Defined SETTINGS Parameters + // TODO these values should be configurable. + settings[indexof(HTTP2_SETTINGS_HEADER_TABLE_SIZE)] = HTTP2_HEADER_TABLE_SIZE; + settings[indexof(HTTP2_SETTINGS_ENABLE_PUSH)] = HTTP2_ENABLE_PUSH; + settings[indexof(HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)] = HTTP2_MAX_CONCURRENT_STREAMS; + settings[indexof(HTTP2_SETTINGS_INITIAL_WINDOW_SIZE)] = HTTP2_INITIAL_WINDOW_SIZE; + settings[indexof(HTTP2_SETTINGS_MAX_FRAME_SIZE)] = HTTP2_MAX_FRAME_SIZE; + settings[indexof(HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE)] = HTTP2_MAX_HEADER_LIST_SIZE; + } + unsigned get(Http2SettingsIdentifier id) const { + ink_assert(id <= HTTP2_SETTINGS_MAX - 1); + + if (id > HTTP2_SETTINGS_MAX - 1) { + return 0; + } return this->settings[indexof(id)]; } unsigned set(Http2SettingsIdentifier id, unsigned value) { + ink_assert(id <= HTTP2_SETTINGS_MAX - 1); + + if (id > HTTP2_SETTINGS_MAX - 1) { + return 0; + } return this->settings[indexof(id)] = value; } @@ -43,12 +67,74 @@ private: // Settings ID is 1-based, so convert it to a 0-based index. static unsigned indexof(Http2SettingsIdentifier id) { + ink_assert(id <= HTTP2_SETTINGS_MAX - 1); + return id - 1; } unsigned settings[HTTP2_SETTINGS_MAX - 1]; }; +class Http2ConnectionState; + +class Http2Stream +{ +public: + Http2Stream(Http2StreamId sid=0, ssize_t initial_rwnd=HTTP2_INITIAL_WINDOW_SIZE) + : client_rwnd(initial_rwnd), server_rwnd(initial_rwnd), + _id(sid), _state(HTTP2_STREAM_STATE_IDLE), + _fetch_sm(NULL), body_done(false), data_length(0) + { + _req_header.create(HTTP_TYPE_REQUEST); + } + + ~Http2Stream() + { + _req_header.clear(); + + if (_fetch_sm) { + _fetch_sm->ext_destroy(); + _fetch_sm = NULL; + } + } + + // Operate FetchSM + void init_fetcher(Http2ConnectionState& cstate); + void set_body_to_fetcher(const void *data, size_t len); + FetchSM * get_fetcher() { return _fetch_sm; } + bool is_body_done() const { return body_done; } + void mark_body_done() { body_done = true; } + + const Http2StreamId get_id() const { return _id; } + const Http2StreamState get_state() const { return _state; } + bool change_state(uint8_t type, uint8_t flags); + + int64_t decode_request_header(const IOVec& iov, Http2DynamicTable& dynamic_table, bool cont) { + return http2_parse_header_fragment(&_req_header, iov, dynamic_table, cont); + } + + // Check entire DATA payload length if content-length: header is exist + void increment_data_length(uint64_t length) { data_length += length; } + bool payload_length_is_valid() const { + uint32_t content_length = _req_header.get_content_length(); + return content_length == 0 || content_length == data_length; + } + + // Stream level window size + ssize_t client_rwnd, server_rwnd; + + LINK(Http2Stream, link); +private: + Http2StreamId _id; + Http2StreamState _state; + + HTTPHdr _req_header; + FetchSM* _fetch_sm; + bool body_done; + uint64_t data_length; +}; + + // Http2ConnectionState // // Capture the semantics of a HTTP/2 connection. The client session captures the frame layer, and the @@ -58,22 +144,78 @@ class Http2ConnectionState : public Continuation { public: - Http2ConnectionState() : Continuation(NULL), ua_session(NULL) { + Http2ConnectionState() + : Continuation(NULL), ua_session(NULL), client_rwnd(HTTP2_INITIAL_WINDOW_SIZE), server_rwnd(HTTP2_INITIAL_WINDOW_SIZE), + stream_list(), latest_streamid(0), continued_id(0) + { SET_HANDLER(&Http2ConnectionState::main_event_handler); } + ~Http2ConnectionState() + { + delete local_dynamic_table; + delete remote_dynamic_table; + + ats_free(continued_buffer.iov_base); + } + Http2ClientSession * ua_session; + Http2DynamicTable * local_dynamic_table; + Http2DynamicTable * remote_dynamic_table; // Settings. Http2ConnectionSettings server_settings; Http2ConnectionSettings client_settings; + void init() + { + local_dynamic_table = new Http2DynamicTable(); + remote_dynamic_table = new Http2DynamicTable(); + + continued_buffer.iov_base = NULL; + continued_buffer.iov_len = 0; + } + + // Event handlers int main_event_handler(int, void *); int state_closed(int, void *); + // Stream control interfaces + Http2Stream* create_stream(Http2StreamId new_id); + Http2Stream* find_stream(Http2StreamId id) const; + void restart_streams(); + void delete_stream(Http2Stream* stream); + void cleanup_streams(); + + void update_initial_rwnd(Http2WindowSize new_size); + + // Continuated header decoding + Http2StreamId get_continued_id() const { return continued_id; } + const IOVec& get_continued_headers() const { return continued_buffer; } + void set_continued_headers(const char * buf, uint32_t len, Http2StreamId id); + void finish_continued_headers(); + + // Connection level window size + ssize_t client_rwnd, server_rwnd; + + // HTTP/2 frame sender + void send_data_frame(FetchSM *fetch_sm); + void send_headers_frame(FetchSM *fetch_sm); + void send_rst_stream_frame(Http2StreamId id, Http2ErrorCode ec); + void send_ping_frame(Http2StreamId id, uint8_t flag, const uint8_t * opaque_data); + void send_goaway_frame(Http2StreamId id, Http2ErrorCode ec); + void send_window_update_frame(Http2StreamId id, uint32_t size); + private: Http2ConnectionState(const Http2ConnectionState&); // noncopyable Http2ConnectionState& operator=(const Http2ConnectionState&); // noncopyable + + DLL<Http2Stream> stream_list; + Http2StreamId latest_streamid; + + // The buffer used for storing incomplete fragments of a header field which consists of multiple frames. + Http2StreamId continued_id; + IOVec continued_buffer; }; #endif // __HTTP2_CONNECTION_STATE_H__
