Implement new stream2 API This embeds the stream_init/1 logic into the stream functions so that we don't have to maintain the logic for inititalizing the stream for all clients.
BugzId: 24635 Project: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/commit/cae29fe1 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/tree/cae29fe1 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-rexi/diff/cae29fe1 Branch: refs/heads/windsor-merge Commit: cae29fe1926db6ec89645da63aee60766f14bd11 Parents: 7733957 Author: Paul J. Davis <[email protected]> Authored: Mon Oct 28 16:03:07 2013 -0500 Committer: Robert Newson <[email protected]> Committed: Wed Jul 23 17:58:32 2014 +0100 ---------------------------------------------------------------------- src/rexi.erl | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-rexi/blob/cae29fe1/src/rexi.erl ---------------------------------------------------------------------- diff --git a/src/rexi.erl b/src/rexi.erl index 20f582b..62f410b 100644 --- a/src/rexi.erl +++ b/src/rexi.erl @@ -18,6 +18,7 @@ -export([stream_init/0, stream_init/1]). -export([stream_start/1, stream_cancel/1]). -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]). +-export([stream2/1, stream2/2, stream2/3, stream_last/1, stream_last/2]). -include_lib("rexi/include/rexi.hrl"). @@ -184,6 +185,43 @@ stream(Msg, Limit, Timeout) -> exit(timeout) end. +%% @equiv stream2(Msg, 10, 300000) +stream2(Msg) -> + stream2(Msg, 10, 300000). + +%% @equiv stream2(Msg, Limit, 300000) +stream2(Msg, Limit) -> + stream2(Msg, Limit, 300000). + +%% @doc Stream a message back to the coordinator. It limits the +%% number of unacked messsages to Limit and throws a timeout error +%% if it doesn't receive an ack in Timeout milliseconds. This +%% is a combination of the old stream_start and stream functions +%% which automatically does the stream initialization logic. +-spec stream2(any(), pos_integer(), pos_integer() | inifinity) -> any(). +stream2(Msg, Limit, Timeout) -> + maybe_init_stream(Timeout), + try maybe_wait(Limit, Timeout) of + {ok, Count} -> + put(rexi_unacked, Count+1), + {Caller, Ref} = get(rexi_from), + erlang:send(Caller, {Ref, self(), Msg}), + ok + catch throw:timeout -> + exit(timeout) + end. + +%% @equiv stream_last(Msg, 300000) +stream_last(Msg) -> + stream_last(Msg, 300000). + +%% @doc Send the last message in a stream. This difference between +%% this and stream is that it uses rexi:reply/1 which doesn't include +%% the worker pid and doesn't wait for a response from the controller. +stream_last(Msg, Timeout) -> + maybe_init_stream(Timeout), + rexi:reply(Msg). + %% @equiv stream_ack(Client, 1) stream_ack(Client) -> erlang:send(Client, {rexi_ack, 1}). @@ -196,6 +234,27 @@ stream_ack(Client, N) -> cast_msg(Msg) -> {'$gen_cast', Msg}. +maybe_init_stream(Timeout) -> + case get(rexi_STREAM_INITED) of + true -> + ok; + _ -> + init_stream(Timeout) + end. + +init_stream(Timeout) -> + case sync_reply(rexi_STREAM_INIT, Timeout) of + rexi_STREAM_START -> + put(rexi_STREAM_INITED, true), + ok; + rexi_STREAM_CANCEL -> + exit(normal); + timeout -> + exit(timeout); + Else -> + exit({invalid_stream_message, Else}) + end. + maybe_wait(Limit, Timeout) -> case get(rexi_unacked) of undefined ->
