Author: kocolosk
Date: Fri Jul  3 15:56:51 2009
New Revision: 790953

URL: http://svn.apache.org/viewvc?rev=790953&view=rev
Log:
ibrowse now allows user to control socket.  Thanks again Chandru

Modified:
    couchdb/trunk/src/ibrowse/ibrowse.erl
    couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
    couchdb/trunk/src/ibrowse/ibrowse_test.erl

Modified: couchdb/trunk/src/ibrowse/ibrowse.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse.erl Fri Jul  3 15:56:51 2009
@@ -89,6 +89,7 @@
         send_req_direct/5,
         send_req_direct/6,
         send_req_direct/7,
+        stream_next/1,
         set_max_sessions/3,
         set_max_pipeline_size/3,
         set_dest/3,
@@ -150,7 +151,8 @@
 %% respHeader() = {headerName(), headerValue()}
 %% headerName() = string()
 %% headerValue() = string()
-%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {error, Reason}
+%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, 
req_id() } | {error, Reason}
+%% req_id = term()
 %% ResponseBody = string() | {file, Filename}
 %% Reason = term()
 send_req(Url, Headers, Method) ->
@@ -425,7 +427,20 @@
        Err ->
            {error, {url_parsing_failed, Err}}
     end.
-    
+
+%% @doc Tell ibrowse to stream the next chunk of data to the
+%% caller. Should be used in conjunction with the
+%% <code>stream_to</code> option
+%% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
+stream_next(Req_id) ->    
+    case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
+       [] ->
+           {error, unknown_req_id};
+       [{_, Pid}] ->
+           catch Pid ! {stream_next, Req_id},
+           ok
+    end.
+
 %% @doc Turn tracing on for the ibrowse process
 trace_on() ->
     ibrowse ! {trace, true}.
@@ -522,6 +537,7 @@
     put(ibrowse_trace_token, "ibrowse"),
     ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
     ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
+    ets:new(ibrowse_stream, [named_table, public]),
     import_config(),
     {ok, #state{}}.
 
@@ -539,9 +555,9 @@
        {ok, Terms} ->
            ets:delete_all_objects(ibrowse_conf),
            Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) 
-                    when list(Host), integer(Port),
-                    integer(MaxSess), MaxSess > 0,
-                    integer(MaxPipe), MaxPipe > 0, list(Options) ->
+                    when is_list(Host), is_integer(Port),
+                         is_integer(MaxSess), MaxSess > 0,
+                         is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
                          I = [{{max_sessions, Host, Port}, MaxSess},
                               {{max_pipeline_size, Host, Port}, MaxPipe},
                               {{options, Host, Port}, Options}],
@@ -641,13 +657,6 @@
                      true ->
                          catch Pid ! {trace, false}
                  end;
-            (#client_conn{key = {H, P, Pid}}, _) ->
-                 case lists:member({H, P}, Trace_on_dests) of
-                     false ->
-                         ok;
-                     true ->
-                         catch Pid ! {trace, false}
-                 end;
             (_, Acc) ->
                  Acc
          end,
@@ -664,10 +673,6 @@
             when H == Host,
                  P == Port ->
                  catch Pid ! {trace, Bool};
-            (#client_conn{key = {H, P, Pid}}, _)
-            when H == Host,
-                 P == Port ->
-                 catch Pid ! {trace, Bool};
             (_, Acc) ->
                  Acc
          end,

Modified: couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_http_client.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_http_client.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_http_client.erl Fri Jul  3 15:56:51 2009
@@ -47,11 +47,12 @@
                is_closing, send_timer, content_length,
                deleted_crlf = false, transfer_encoding,
                chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
-               lb_ets_tid, cur_pipeline_size = 0
+               lb_ets_tid, cur_pipeline_size = 0, prev_req_id
               }).
 
 -record(request, {url, method, options, from,
-                 stream_to, req_id,
+                 stream_to, caller_controls_socket = false, 
+                 req_id,
                  stream_chunk_size,
                  save_response_to_file = false, 
                  tmp_file_name, tmp_file_fd,
@@ -126,144 +127,15 @@
 %%--------------------------------------------------------------------
 %% Received a request when the remote server has already sent us a
 %% Connection: Close header
-handle_call({send_req, _},
-           _From,
-           #state{is_closing=true}=State) ->
+handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
     {reply, {error, connection_closing}, State};
 
 handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
-           From,
-           #state{socket=undefined,
-                  host=Host, port=Port}=State) ->
-    Resp_format = get_value(response_format, Options, list),
-    {Host_1, Port_1, State_1} =
-       case get_value(proxy_host, Options, false) of
-           false ->
-               {Host, Port, State};
-           PHost ->
-               ProxyUser = get_value(proxy_user, Options, []),
-               ProxyPassword = get_value(proxy_password, Options, []),
-               Digest = http_auth_digest(ProxyUser, ProxyPassword),
-               {PHost, get_value(proxy_port, Options, 80),
-                State#state{use_proxy = true,
-                            proxy_auth_digest = Digest}}
-       end,
-    StreamTo = get_value(stream_to, Options, undefined),
-    ReqId = make_req_id(),
-    SaveResponseToFile = get_value(save_response_to_file, Options, false),
-    NewReq = #request{url=Url,
-                     method=Method,
-                     stream_to=StreamTo,
-                     options=Options,
-                     req_id=ReqId,
-                     save_response_to_file = SaveResponseToFile,
-                     stream_chunk_size = get_stream_chunk_size(Options),
-                     response_format = Resp_format,
-                     from=From},
-    Reqs = queue:in(NewReq, State#state.reqs),
-    State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
-    do_trace("Connecting...~n", []),
-    Start_ts = now(),
-    Conn_timeout = get_value(connect_timeout, Options, Timeout),
-    case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
-       {ok, Sock} ->
-           do_trace("Connected!~n", []),
-           End_ts = now(),
-           Ref = case Timeout of
-                     infinity ->
-                         undefined;
-                     _ ->
-                         Rem_time = Timeout - 
trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)),
-                         case Rem_time > 0 of
-                             true ->
-                                 erlang:send_after(Rem_time, self(), 
{req_timedout, From});
-                             false ->
-                                 shutting_down(State_2),
-                                 do_error_reply(State_2, req_timedout),
-                                 exit(normal)
-                         end
-                 end,
-           case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) 
of
-               ok ->
-                   do_setopts(Sock, [{active, once}], State_2#state.is_ssl),
-                   case StreamTo of
-                       undefined ->
-                           ok;
-                       _ ->
-                           gen_server:reply(From, {ibrowse_req_id, ReqId})
-                   end,
-                   State_3 = inc_pipeline_counter(State_2#state{socket = Sock,
-                                                                send_timer = 
Ref,
-                                                                cur_req = 
NewReq,
-                                                                status = 
get_header}),
-                   {noreply, State_3, get_inac_timeout(State_3)};
-               Err ->
-                   shutting_down(State_2),
-                   do_trace("Send failed... Reason: ~p~n", [Err]),
-                   gen_server:reply(From, {error, send_failed}),
-                   {stop, normal, State_2}
-           end;
-       Err ->
-           shutting_down(State_2),
-           do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
-           gen_server:reply(From, {error, conn_failed}),
-           {stop, normal, State_2}
-    end;
-
-%% Request which is to be pipelined
-handle_call({send_req, {Url, Headers, Method,
-                        Body, Options, Timeout}},
-           From,
-           #state{socket=Sock, status=Status, reqs=Reqs}=State) ->
-    do_trace("Recvd request in connected state. Status -> ~p NumPending: 
~p~n", [Status, length(queue:to_list(Reqs))]),
-    Resp_format = get_value(response_format, Options, list),
-    StreamTo = get_value(stream_to, Options, undefined),
-    SaveResponseToFile = get_value(save_response_to_file, Options, false),
-    ReqId = make_req_id(),
-    NewReq = #request{url=Url,
-                     stream_to=StreamTo,
-                     method=Method,
-                     options=Options,
-                     req_id=ReqId,
-                     save_response_to_file = SaveResponseToFile,
-                     stream_chunk_size = get_stream_chunk_size(Options),
-                     response_format = Resp_format,
-                     from=From},
-    State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
-    case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
-       ok ->
-           State_2 = inc_pipeline_counter(State_1),
-           do_setopts(Sock, [{active, once}], State#state.is_ssl),
-           case Timeout of
-               infinity ->
-                   ok;
-               _ ->
-                   erlang:send_after(Timeout, self(), {req_timedout, From})
-           end,
-           State_3 = case Status of
-                         idle ->
-                             State_2#state{status = get_header,
-                                           cur_req = NewReq};
-                         _ ->
-                             State_2
-                     end,
-           case StreamTo of
-               undefined ->
-                   ok;
-               _ ->
-                   gen_server:reply(From, {ibrowse_req_id, ReqId})
-           end,
-           {noreply, State_3, get_inac_timeout(State_3)};
-       Err ->
-           shutting_down(State_1),
-           do_trace("Send request failed: Reason: ~p~n", [Err]),
-           gen_server:reply(From, {error, send_failed}),
-           do_error_reply(State, send_failed),
-           {stop, normal, State_1}
-    end;
+           From, State) ->
+    send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
 
-handle_call(stop, _From, #state{socket = Socket, is_ssl = Is_ssl} = State) ->
-    do_close(Socket, Is_ssl),
+handle_call(stop, _From, State) ->
+    do_close(State),
     do_error_reply(State, closing_on_request),
     {stop, normal, State};
 
@@ -294,6 +166,15 @@
 handle_info({ssl, _Sock, Data}, State) ->
     handle_sock_data(Data, State);
 
+handle_info({stream_next, Req_id}, #state{socket = Socket,
+                                         is_ssl = Is_ssl,
+                                         cur_req = #request{req_id = Req_id}} 
= State) ->
+    do_setopts(Socket, [{active, once}], Is_ssl),
+    {noreply, State};
+
+handle_info({stream_next, _Req_id}, State) ->
+    {noreply, State};
+
 handle_info({tcp_closed, _Sock}, State) ->
     do_trace("TCP connection closed by peer!~n", []),
     handle_sock_closed(State),
@@ -332,12 +213,7 @@
 %% Returns: any (ignored by gen_server)
 %%--------------------------------------------------------------------
 terminate(_Reason, State) ->
-    case State#state.socket of
-       undefined ->
-           ok;
-       Sock ->
-           do_close(Sock, State#state.is_ssl)
-    end.
+    do_close(State).
 
 %%--------------------------------------------------------------------
 %% Func: code_change/3
@@ -358,10 +234,10 @@
     do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
     shutting_down(State),
     do_error_reply(State, data_in_status_idle),
-    do_close(State#state.socket, State#state.is_ssl),
+    do_close(State),
     {stop, normal, State};
 
-handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
+handle_sock_data(Data, #state{status = get_header}=State) ->
     case parse_response(Data, State) of
        {error, _Reason} ->
            shutting_down(State),
@@ -370,14 +246,15 @@
            shutting_down(State),
            {stop, normal, State};
        State_1 ->
-           do_setopts(Sock, [{active, once}], State#state.is_ssl),
+           active_once(State_1),
            {noreply, State_1, get_inac_timeout(State_1)}
     end;
 
-handle_sock_data(Data, #state{status=get_body, content_length=CL,
+handle_sock_data(Data, #state{status           = get_body,
+                             content_length   = CL,
                              http_status_code = StatCode,
-                             recvd_headers=Headers,
-                             chunk_size=CSz, socket=Sock}=State) ->
+                             recvd_headers    = Headers,
+                             chunk_size       = CSz} = State) ->
     case (CL == undefined) and (CSz == undefined) of
        true ->
            case accumulate_response(Data, State) of
@@ -387,7 +264,7 @@
                                            {error, {Reason, {stat_code, 
StatCode}, Headers}}),
                    {stop, normal, State};
                State_1 ->
-                   do_setopts(Sock, [{active, once}], State#state.is_ssl),
+                   active_once(State_1),
                    {noreply, State_1, get_inac_timeout(State_1)}
            end;
        _ ->
@@ -401,7 +278,7 @@
                    shutting_down(State),
                    {stop, normal, State};
                State_1 ->
-                   do_setopts(Sock, [{active, once}], State#state.is_ssl),
+                   active_once(State_1),
                    {noreply, State_1, get_inac_timeout(State_1)}
            end
     end.
@@ -452,22 +329,27 @@
                                 cur_req = CurReq}=State) ->
     #request{stream_to=StreamTo, req_id=ReqId,
             stream_chunk_size = Stream_chunk_size,
-            response_format = Response_format} = CurReq,
+            response_format = Response_format,
+            caller_controls_socket = Caller_controls_socket} = CurReq,
     RepBuf_1 = concat_binary([RepBuf, Data]),
     New_data_size = RepBufSize - Streamed_size,
     case StreamTo of
        undefined ->
            State#state{reply_buffer = RepBuf_1};
-       _ when New_data_size < Stream_chunk_size ->
-           State#state{reply_buffer = RepBuf_1};
-       _ ->
+       _ when Caller_controls_socket == true ->
+           do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
+           State#state{reply_buffer = <<>>, 
+                       streamed_size = Streamed_size + size(RepBuf_1)};
+       _ when New_data_size >= Stream_chunk_size ->
            {Stream_chunk, Rem_data} = split_binary(RepBuf_1, 
Stream_chunk_size),
            do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
            accumulate_response(
              Rem_data,
              State#state{
                reply_buffer = <<>>,
-               streamed_size = Streamed_size + Stream_chunk_size})
+               streamed_size = Streamed_size + Stream_chunk_size});
+       _ ->
+           State#state{reply_buffer = RepBuf_1}
     end.
 
 make_tmp_filename() ->
@@ -528,37 +410,45 @@
                    [binary, {nodelay, true}, {active, false}],
                    Timeout).
 
-do_send(Sock, Req, true)  ->  ssl:send(Sock, Req);
-do_send(Sock, Req, false) ->  gen_tcp:send(Sock, Req).
+do_send(Req, #state{socket = Sock, is_ssl = true})  ->  ssl:send(Sock, Req);
+do_send(Req, #state{socket = Sock, is_ssl = false}) ->  gen_tcp:send(Sock, 
Req).
 
 %% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), 
IsSSL::boolean()) -> ok | error()
 %% source_descriptor() = fun_arity_0           |
 %%                       {fun_arity_0}         |
 %%                       {fun_arity_1, term()}
 %% error() = term()
-do_send_body(Sock, Source, IsSSL) when is_function(Source) ->
-    do_send_body(Sock, {Source}, IsSSL);
-do_send_body(Sock, {Source}, IsSSL) when is_function(Source) ->
-    do_send_body1(Sock, Source, IsSSL, Source());
-do_send_body(Sock, {Source, State}, IsSSL) when is_function(Source) ->
-    do_send_body1(Sock, Source, IsSSL, Source(State));
-do_send_body(Sock, Body, IsSSL) ->
-    do_send(Sock, Body, IsSSL).
+do_send_body(Source, State) when is_function(Source) ->
+    do_send_body({Source}, State);
+do_send_body({Source}, State) when is_function(Source) ->
+    do_send_body1(Source, Source(), State);
+do_send_body({Source, Source_state}, State) when is_function(Source) ->
+    do_send_body1(Source, Source(Source_state), State);
+do_send_body(Body, State) ->
+    do_send(Body, State).
 
-do_send_body1(Sock, Source, IsSSL, Resp) ->
+do_send_body1(Source, Resp, State) ->
     case Resp of
        {ok, Data} ->
-           do_send(Sock, Data, IsSSL),
-           do_send_body(Sock, {Source}, IsSSL);
-       {ok, Data, NewState} ->
-           do_send(Sock, Data, IsSSL),
-           do_send_body(Sock, {Source, NewState}, IsSSL);
-       eof -> ok;
-       Err -> Err
+           do_send(Data, State),
+           do_send_body({Source}, State);
+       {ok, Data, New_source_state} ->
+           do_send(Data, State),
+           do_send_body({Source, New_source_state}, State);
+       eof ->
+           ok;
+       Err ->
+           Err
     end.
 
-do_close(Sock, true)  ->  ssl:close(Sock);
-do_close(Sock, false) ->  gen_tcp:close(Sock).
+do_close(#state{socket = undefined})            ->  ok;
+do_close(#state{socket = Sock, is_ssl = true})  ->  ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) ->  gen_tcp:close(Sock).
+
+active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
+    ok;
+active_once(#state{socket = Socket, is_ssl = Is_ssl}) ->
+    do_setopts(Socket, [{active, once}], Is_ssl).
 
 do_setopts(Sock, Opts, true)  ->  ssl:setopts(Sock, Opts);
 do_setopts(Sock, Opts, false) ->  inet:setopts(Sock, Opts).
@@ -571,11 +461,81 @@
            State#state{is_ssl=true, ssl_options=get_value(ssl_options, 
Options)}
     end.
 
-send_req_1(#url{abspath = AbsPath,
-               host = Host,
-               port = Port,
-               path = RelPath} = Url,
-          Headers, Method, Body, Options, Sock, State) ->
+send_req_1(From,
+          #url{host = Host,
+               port = Port} = Url,
+          Headers, Method, Body, Options, Timeout,
+          #state{socket = undefined} = State) ->
+    {Host_1, Port_1, State_1} =
+       case get_value(proxy_host, Options, false) of
+           false ->
+               {Host, Port, State};
+           PHost ->
+               ProxyUser     = get_value(proxy_user, Options, []),
+               ProxyPassword = get_value(proxy_password, Options, []),
+               Digest        = http_auth_digest(ProxyUser, ProxyPassword),
+               {PHost, get_value(proxy_port, Options, 80),
+                State#state{use_proxy = true,
+                            proxy_auth_digest = Digest}}
+       end,
+    State_2 = check_ssl_options(Options, State_1),
+    do_trace("Connecting...~n", []),
+    Start_ts = now(),
+    Conn_timeout = get_value(connect_timeout, Options, Timeout),
+    case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
+       {ok, Sock} ->
+           do_trace("Connected!~n", []),
+           End_ts = now(),
+           Timeout_1 = case Timeout of
+                           infinity ->
+                               infinity;
+                           _ ->
+                               Timeout - trunc(round(timer:now_diff(End_ts, 
Start_ts) / 1000))
+                       end,
+           State_3 = State_2#state{socket = Sock},
+           send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, 
State_3);
+       Err ->
+           shutting_down(State_2),
+           do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
+           gen_server:reply(From, {error, conn_failed}),
+           {stop, normal, State_2}
+    end;
+send_req_1(From,
+          #url{abspath = AbsPath,
+               host    = Host,
+               port    = Port,
+               path    = RelPath} = Url,
+          Headers, Method, Body, Options, Timeout,
+          #state{status = Status} = State) ->
+    ReqId = make_req_id(),
+    Resp_format = get_value(response_format, Options, list),
+    {StreamTo, Caller_controls_socket} =
+       case get_value(stream_to, Options, undefined) of
+           {Caller, once} when is_pid(Caller) or
+                               is_atom(Caller) ->
+               Async_pid_rec = {{req_id_pid, ReqId}, self()},
+               true = ets:insert(ibrowse_stream, Async_pid_rec), 
+               {Caller, true};
+           undefined ->
+               {undefined, false};
+           Caller when is_pid(Caller) or
+                       is_atom(Caller) ->
+               {Caller, false};
+           Stream_to_inv ->
+               exit({invalid_option, {stream_to, Stream_to_inv}})
+       end,
+    SaveResponseToFile = get_value(save_response_to_file, Options, false),
+    NewReq = #request{url                    = Url,
+                     method                 = Method,
+                     stream_to              = StreamTo,
+                     caller_controls_socket = Caller_controls_socket,
+                     options                = Options,
+                     req_id                 = ReqId,
+                     save_response_to_file  = SaveResponseToFile,
+                     stream_chunk_size      = get_stream_chunk_size(Options),
+                     response_format        = Resp_format,
+                     from                   = From},
+    State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Headers_1 = add_auth_headers(Url, Options, Headers, State),
     HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
                          false ->
@@ -598,14 +558,45 @@
                     "--- Request End ---~n", [NReq]);
        _ -> ok
     end,
-    SndRes = case do_send(Sock, Req, State#state.is_ssl) of
-                ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
-                Err ->
-                    io:format("Err: ~p~n", [Err]),
-                    Err
-            end,
-    do_setopts(Sock, [{active, once}], State#state.is_ssl),
-    SndRes.
+    case do_send(Req, State) of
+       ok ->
+           case do_send_body(Body_1, State) of
+               ok ->
+                   State_2 = inc_pipeline_counter(State_1),
+                   active_once(State_1),
+                   Ref = case Timeout of
+                             infinity ->
+                                 undefined;
+                             _ ->
+                                 erlang:send_after(Timeout, self(), 
{req_timedout, From})
+                         end,
+                   State_3 = case Status of
+                                 idle ->
+                                     State_2#state{status     = get_header,
+                                                   cur_req    = NewReq,
+                                                   send_timer = Ref};
+                                 _ ->
+                                     State_2#state{send_timer = Ref}
+                             end,
+                   case StreamTo of
+                       undefined ->
+                           ok;
+                       _ ->
+                           gen_server:reply(From, {ibrowse_req_id, ReqId})
+                   end,
+                   {noreply, State_3, get_inac_timeout(State_3)};
+               Err ->
+                   shutting_down(State_1),
+                   do_trace("Send failed... Reason: ~p~n", [Err]),
+                   gen_server:reply(From, {error, send_failed}),
+                   {stop, normal, State_1}
+           end;
+       Err ->
+           shutting_down(State_1),
+           do_trace("Send failed... Reason: ~p~n", [Err]),
+           gen_server:reply(From, {error, send_failed}),
+           {stop, normal, State_1}
+    end.
 
 add_auth_headers(#url{username = User,
                      password = UPw},
@@ -719,9 +710,9 @@
     encode_headers(L, []).
 encode_headers([{http_vsn, _Val} | T], Acc) ->
     encode_headers(T, Acc);
-encode_headers([{Name,Val} | T], Acc) when list(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_list(Name) ->
     encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
-encode_headers([{Name,Val} | T], Acc) when atom(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
     encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | 
Acc]);
 encode_headers([], Acc) ->
     lists:reverse(Acc).
@@ -732,25 +723,25 @@
 chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
     LastChunk = "0\r\n",
     lists:reverse(["\r\n", LastChunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when binary(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
                                               size(Body) >= ChunkSize ->
     <<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
     Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
             ChunkBody, "\r\n"],
     chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when binary(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
     BodySize = size(Body),
     Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
             Body, "\r\n"],
     LastChunk = "0\r\n",
     lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when list(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_list(Body),
                                               length(Body) >= ChunkSize ->
     {ChunkBody, Rest} = split_list_at(Body, ChunkSize),
     Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
             ChunkBody, "\r\n"],
     chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when list(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
     BodySize = length(Body),
     Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
             Body, "\r\n"],
@@ -840,7 +831,7 @@
                    {error, content_length_undefined};
                V ->
                    case catch list_to_integer(V) of
-                       V_1 when integer(V_1), V_1 >= 0 ->
+                       V_1 when is_integer(V_1), V_1 >= 0 ->
                            send_async_headers(ReqId, StreamTo, StatCode, 
Headers_1),
                            do_trace("Recvd Content-Length of ~p~n", [V_1]),
                            State_2 = State_1#state{rep_buf_size=0,
@@ -1058,17 +1049,20 @@
 parse_headers(Headers) ->
     case scan_crlf(Headers) of
        {yes, StatusLine, T} ->
-           Headers_1 = parse_headers_1(T),
-           case parse_status_line(StatusLine) of
-               {ok, HttpVsn, StatCode, _Msg} ->
-                   put(http_prot_vsn, HttpVsn),
-                   {HttpVsn, StatCode, Headers_1};
-               _ -> %% A HTTP 0.9 response?
-                   put(http_prot_vsn, "HTTP/0.9"),
-                   {"HTTP/0.9", undefined, Headers}
-           end;
-       _ ->
-           {error, no_status_line}
+           parse_headers(StatusLine, T);
+       {no, StatusLine} ->
+           parse_headers(StatusLine, <<>>)
+    end.
+
+parse_headers(StatusLine, Headers) ->
+    Headers_1 = parse_headers_1(Headers),
+    case parse_status_line(StatusLine) of
+       {ok, HttpVsn, StatCode, _Msg} ->
+           put(http_prot_vsn, HttpVsn),
+           {HttpVsn, StatCode, Headers_1};
+       _ -> %% A HTTP 0.9 response?
+           put(http_prot_vsn, "HTTP/0.9"),
+           {"HTTP/0.9", undefined, Headers}
     end.
 
 % From RFC 2616
@@ -1079,10 +1073,10 @@
 %    SP. A recipient MAY replace any linear white space with a single
 %    SP before interpreting the field value or forwarding the message
 %    downstream.
-parse_headers_1(B) when is_binary(B) ->
-    parse_headers_1(binary_to_list(B));
-parse_headers_1(String) ->
-    parse_headers_1(String, [], []).
+       parse_headers_1(B) when is_binary(B) ->
+                                          parse_headers_1(binary_to_list(B));
+       parse_headers_1(String) ->
+                                          parse_headers_1(String, [], []).
 
 parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
                                                  H == $\t ->
@@ -1205,10 +1199,10 @@
 %% scan_crlf([H|T],  L)                    -> scan_crlf(T, [H|L]);
 %% scan_crlf([], L)                        -> {no, L}.
 
-fmt_val(L) when list(L)    -> L;
-fmt_val(I) when integer(I) -> integer_to_list(I);
-fmt_val(A) when atom(A)    -> atom_to_list(A);
-fmt_val(Term)              -> io_lib:format("~p", [Term]).
+fmt_val(L) when is_list(L)    -> L;
+fmt_val(I) when is_integer(I) -> integer_to_list(I);
+fmt_val(A) when is_atom(A)    -> atom_to_list(A);
+fmt_val(Term)                 -> io_lib:format("~p", [Term]).
 
 crnl() -> "\r\n".
 
@@ -1306,7 +1300,8 @@
 do_reply(State, From, undefined, _, _, Msg) ->
     gen_server:reply(From, Msg),
     dec_pipeline_counter(State);
-do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
+do_reply(#state{prev_req_id = Prev_req_id} = State,
+        _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
     State_1 = dec_pipeline_counter(State),
     case Body of
        [] ->
@@ -1316,7 +1311,18 @@
            catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
     end,
     catch StreamTo ! {ibrowse_async_response_end, ReqId},
-    State_1;
+    %% We don't want to delete the Req-id to Pid mapping straightaway
+    %% as the client may send a stream_next message just while we are
+    %% sending back this ibrowse_async_response_end message. If we
+    %% deleted this mapping straightaway, the caller will see a
+    %% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
+    %% get around this, we store the req id, and clear it after the
+    %% next request. If there are wierd combinations of stream,
+    %% stream_once and sync requests on the same connection, it will
+    %% take a while for the req_id-pid mapping to get cleared, but it
+    %% should do no harm.
+    ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
+    State_1#state{prev_req_id = ReqId};
 do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
     State_1 = dec_pipeline_counter(State),
     Msg_1 = format_response_data(Resp_format, Msg),
@@ -1333,6 +1339,7 @@
     ReqList = queue:to_list(Reqs),
     lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
                               response_format = Resp_format}) ->
+                         ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
                           do_reply(State, From, StreamTo, ReqId, Resp_format, 
{error, Err})
                  end, ReqList).
 

Modified: couchdb/trunk/src/ibrowse/ibrowse_test.erl
URL: 
http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_test.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_test.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_test.erl Fri Jul  3 15:56:51 2009
@@ -18,9 +18,50 @@
         ue_test/1,
         verify_chunked_streaming/0,
         verify_chunked_streaming/1,
-        i_do_async_req_list/4
+        i_do_async_req_list/4,
+        test_stream_once/3,
+        test_stream_once/4
        ]).
 
+test_stream_once(Url, Method, Options) ->
+    test_stream_once(Url, Method, Options, 5000).
+
+test_stream_once(Url, Method, Options, Timeout) ->
+    case ibrowse:send_req(Url, [], Method, [], [{stream_to, {self(), once}} | 
Options], Timeout) of
+       {ibrowse_req_id, Req_id} ->
+           case ibrowse:stream_next(Req_id) of
+               ok ->
+                   test_stream_once(Req_id);
+               Err ->
+                   Err
+           end;
+       Err ->
+           Err
+    end.
+
+test_stream_once(Req_id) ->
+    receive
+       {ibrowse_async_headers, Req_id, StatCode, Headers} ->
+           io:format("Recvd headers~n~p~n", [{ibrowse_async_headers, Req_id, 
StatCode, Headers}]),
+           case ibrowse:stream_next(Req_id) of
+               ok ->
+                   test_stream_once(Req_id);
+               Err ->
+                   Err
+           end;
+       {ibrowse_async_response, Req_id, {error, Err}} ->
+           io:format("Recvd error: ~p~n", [Err]);
+       {ibrowse_async_response, Req_id, Body_1} ->
+           io:format("Recvd body part: ~n~p~n", [{ibrowse_async_response, 
Req_id, Body_1}]),
+           case ibrowse:stream_next(Req_id) of
+               ok ->
+                   test_stream_once(Req_id);
+               Err ->
+                   Err
+           end;
+       {ibrowse_async_response_end, Req_id} ->
+           ok
+    end.
 %% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to
 %% tweak settings before running the load test. The defaults are 10 and 10.
 load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url),
@@ -182,7 +223,8 @@
     unit_tests([]).
 
 unit_tests(Options) ->
-    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), 
Options]),
+    Options_1 = Options ++ [{connect_timeout, 5000}],
+    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), 
Options_1]),
     receive 
        {done, Pid} ->
            ok;


Reply via email to