Repository: couchdb-ibrowse Updated Branches: refs/heads/upstream [created] b28542d1e
Changed pipeline algo to smallest pipeline first Big commit. Switched algorithm to one which will favor the connection with the smallest pipeline first (deciding ties by timestamp of last finished request, and then by pid as ultimate tie breaker). Note: this also drastically changes the internal representation of the connection in ets and is dependent on specific order of operations when changing key values to limit risk of race conditions between loadbalancer and a given connection. Also removed connection reporting of start of request as this was no longer necessary since the load balancer tees up the entry into ets with a 1. Project: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/commit/9d0b7e3e Tree: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/tree/9d0b7e3e Diff: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/diff/9d0b7e3e Branch: refs/heads/upstream Commit: 9d0b7e3eea12a72ae619e6f34aab349b25893eef Parents: 3061aa2 Author: benjaminplee <[email protected]> Authored: Wed Nov 19 21:50:54 2014 +0000 Committer: benjaminplee <[email protected]> Committed: Thu Nov 20 16:15:51 2014 +0000 ---------------------------------------------------------------------- src/ibrowse_http_client.erl | 44 ++++++++------------- src/ibrowse_lb.erl | 71 ++++++++++++++++++++++------------ test/ibrowse_functional_tests.erl | 11 ++---- 3 files changed, 66 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/src/ibrowse_http_client.erl ---------------------------------------------------------------------- diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl index 1bb95d2..c9161b0 100644 --- a/src/ibrowse_http_client.erl +++ b/src/ibrowse_http_client.erl @@ -762,11 +762,10 @@ send_req_1(From, {ok, _Sent_body} -> trace_request_body(Body_1), _ = active_once(State_1), - State_1_1 = inc_pipeline_counter(State_1), - State_2 = State_1_1#state{status = get_header, - cur_req = NewReq, - proxy_tunnel_setup = in_progress, - tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, + State_2 = State_1#state{status = get_header, + cur_req = NewReq, + proxy_tunnel_setup = in_progress, + tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]}, State_3 = set_inac_timer(State_2), {noreply, State_3}; Err -> @@ -853,15 +852,14 @@ send_req_1(From, Raw_req = list_to_binary([Req, Sent_body]), NewReq_1 = NewReq#request{raw_req = Raw_req}, State_1 = State#state{reqs=queue:in(NewReq_1, State#state.reqs)}, - State_2 = inc_pipeline_counter(State_1), - _ = active_once(State_2), - State_3 = case Status of + _ = active_once(State_1), + State_2 = case Status of idle -> - State_2#state{ + State_1#state{ status = get_header, cur_req = NewReq_1}; _ -> - State_2 + State_1 end, case StreamTo of undefined -> @@ -875,8 +873,8 @@ send_req_1(From, catch StreamTo ! {ibrowse_async_raw_req, Raw_req} end end, - State_4 = set_inac_timer(State_3), - {noreply, State_4}; + State_3 = set_inac_timer(State_2), + {noreply, State_3}; Err -> shutting_down(State), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -1815,13 +1813,13 @@ format_response_data(Resp_format, Body) -> do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -> Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)}, gen_server:reply(From, Msg_1), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(State, From, undefined, _, _, Msg) -> gen_server:reply(From, Msg), - dec_pipeline_counter(State); + report_request_complete(State); do_reply(#state{prev_req_id = Prev_req_id} = State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), case Body of [] -> ok; @@ -1843,7 +1841,7 @@ do_reply(#state{prev_req_id = Prev_req_id} = State, ets:delete(?STREAM_TABLE, {req_id_pid, Prev_req_id}), State_1#state{prev_req_id = ReqId}; do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) -> - State_1 = dec_pipeline_counter(State), + State_1 = report_request_complete(State), Msg_1 = format_response_data(Resp_format, Msg), catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}, State_1. @@ -1946,19 +1944,11 @@ shutting_down(#state{lb_ets_tid = undefined}) -> shutting_down(#state{lb_ets_tid = Tid}) -> ibrowse_lb:report_connection_down(Tid). -inc_pipeline_counter(#state{is_closing = true} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> - State; -inc_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> - ibrowse_lb:report_request_underway(Tid), - State. - -dec_pipeline_counter(#state{is_closing = true} = State) -> +report_request_complete(#state{is_closing = true} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) -> +report_request_complete(#state{lb_ets_tid = undefined} = State) -> State; -dec_pipeline_counter(#state{lb_ets_tid = Tid} = State) -> +report_request_complete(#state{lb_ets_tid = Tid} = State) -> ibrowse_lb:report_request_complete(Tid), State. http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/src/ibrowse_lb.erl ---------------------------------------------------------------------- diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl index cc067fc..3d487d4 100644 --- a/src/ibrowse_lb.erl +++ b/src/ibrowse_lb.erl @@ -16,7 +16,6 @@ spawn_connection/6, stop/1, report_connection_down/1, - report_request_underway/1, report_request_complete/1 ]). @@ -39,6 +38,9 @@ proc_state}). -define(PIPELINE_MAX, 99999). +-define(KEY_MATCHSPEC_BY_PID(Pid), [{{{'_', '_', Pid}, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC(Key), [{{Key, '_'}, [], ['$_']}]). +-define(KEY_MATCHSPEC_FOR_DELETE(Key), [{{Key, '_'}, [], [true]}]). -include("ibrowse.hrl"). @@ -74,13 +76,23 @@ stop(Lb_pid) -> end. report_connection_down(Tid) -> - catch ets:delete(Tid, self()). - -report_request_underway(Tid) -> - catch ets:update_counter(Tid, self(), {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). + %% Don't cascade errors since Tid is really managed by other process + catch ets:select_delete(Tid, ?KEY_MATCHSPEC_BY_PID(self())). report_request_complete(Tid) -> - catch ets:update_counter(Tid, self(), {2, -1, 0, 0}). + %% Don't cascade errors since Tid is really managed by other process + catch case ets:select(Tid, ?KEY_MATCHSPEC_BY_PID(self())) of + [MatchKey] -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(MatchKey)) of + 1 -> + ets:insert(Tid, {decremented(MatchKey), undefined}), + true; + _ -> + false + end; + _ -> + false + end. %%==================================================================== %% Server functions @@ -210,23 +222,17 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -find_best_connection(Tid, Max_pipe) -> - find_best_connection(ets:first(Tid), Tid, Max_pipe). - -find_best_connection('$end_of_table', _, _) -> - {error, retry_later}; -find_best_connection(Pid, Tid, Max_pipe) -> - case ets:lookup(Tid, Pid) of - [{Pid, Cur_sz}] when Cur_sz < Max_pipe -> - case record_request_for_connection(Tid, Pid) of - {'EXIT', _} -> - %% The selected process has shutdown - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe); - _ -> - {ok, Pid} +find_best_connection(Tid, Max_pipeline_size) -> + case ets:first(Tid) of + {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size -> + case record_request_for_connection(Tid, Key) of + true -> + {ok, Pid}; + false -> + find_best_connection(Tid, Max_pipeline_size) end; - _ -> - find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe) + _ -> + {error, retry_later} end. maybe_create_ets(#state{ets_tid = undefined} = State) -> @@ -240,10 +246,25 @@ num_current_connections(Tid) -> catch ets:info(Tid, size). record_new_connection(Tid, Pid) -> - catch ets:insert(Tid, {Pid, 0}). + catch ets:insert(Tid, {new_key(Pid), undefined}). + +record_request_for_connection(Tid, Key) -> + case ets:select_delete(Tid, ?KEY_MATCHSPEC_FOR_DELETE(Key)) of + 1 -> + ets:insert(Tid, {incremented(Key), undefined}), + true; + _ -> + false + end. + +new_key(Pid) -> + {1, os:timestamp(), Pid}. + +incremented({Size, Timestamp, Pid}) -> + {Size + 1, Timestamp, Pid}. -record_request_for_connection(Tid, Pid) -> - catch ets:update_counter(Tid, Pid, {2, 1, ?PIPELINE_MAX, ?PIPELINE_MAX}). +decremented({Size, _Timestamp, Pid}) -> + {Size - 1, os:timestamp(), Pid}. for_each_connection_pid(Tid, Fun) -> catch ets:foldl(fun({Pid, _}, _) -> Fun(Pid) end, undefined, Tid), http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/9d0b7e3e/test/ibrowse_functional_tests.erl ---------------------------------------------------------------------- diff --git a/test/ibrowse_functional_tests.erl b/test/ibrowse_functional_tests.erl index 0a68e14..fc7afec 100644 --- a/test/ibrowse_functional_tests.erl +++ b/test/ibrowse_functional_tests.erl @@ -56,15 +56,10 @@ balanced_connections() -> timer:sleep(1000), - Diffs = [Count - BalancedNumberOfRequestsPerConnection || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], - ?assertEqual(MaxSessions, length(Diffs)), + Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()], + ?assertEqual(MaxSessions, length(Counts)), - lists:foreach(fun(X) -> ?assertEqual(yep, close_to_zero(X)) end, Diffs). - -close_to_zero(0) -> yep; -close_to_zero(-1) -> yep; -close_to_zero(1) -> yep; -close_to_zero(X) -> {nope, X}. + ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts). times(0, _) -> ok;
